1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::mem::size_of_val;
19use std::ops::Bound::Included;
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering::Relaxed;
23use std::sync::{Arc, LazyLock};
24
25use bytes::Bytes;
26use risingwave_common::catalog::TableId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_hummock_sdk::EpochWithGap;
29use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
30
31use crate::hummock::iterator::{
32 Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta,
33};
34use crate::hummock::shared_buffer::TableMemoryMetrics;
35use crate::hummock::utils::range_overlap;
36use crate::hummock::value::HummockValue;
37use crate::hummock::{HummockEpoch, HummockResult};
38use crate::store::ReadOptions;
39
40#[derive(Clone, Copy, Debug, PartialEq)]
41pub enum SharedBufferValue<T> {
42 Insert(T),
43 Update(T),
44 Delete,
45}
46
47impl<T> SharedBufferValue<T> {
48 fn to_ref(&self) -> SharedBufferValue<&T> {
49 match self {
50 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val),
51 SharedBufferValue::Update(val) => SharedBufferValue::Update(val),
52 SharedBufferValue::Delete => SharedBufferValue::Delete,
53 }
54 }
55}
56
57impl<T> From<SharedBufferValue<T>> for HummockValue<T> {
58 fn from(val: SharedBufferValue<T>) -> HummockValue<T> {
59 match val {
60 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
61 HummockValue::Put(val)
62 }
63 SharedBufferValue::Delete => HummockValue::Delete,
64 }
65 }
66}
67
68impl<'a, T: AsRef<[u8]>> SharedBufferValue<&'a T> {
69 pub(crate) fn to_slice(self) -> SharedBufferValue<&'a [u8]> {
70 match self {
71 SharedBufferValue::Insert(val) => SharedBufferValue::Insert(val.as_ref()),
72 SharedBufferValue::Update(val) => SharedBufferValue::Update(val.as_ref()),
73 SharedBufferValue::Delete => SharedBufferValue::Delete,
74 }
75 }
76}
77
78pub(crate) type SharedBufferItem = (TableKey<Bytes>, SharedBufferValue<Bytes>);
80pub type SharedBufferBatchId = u64;
81
82#[derive(Debug, PartialEq)]
83pub(crate) struct SharedBufferEntry {
84 pub(crate) key: TableKey<Bytes>,
85 pub(crate) value: SharedBufferValue<Bytes>,
86}
87
88#[derive(Debug)]
89pub(crate) struct SharedBufferBatchOldValues {
90 values: Vec<Bytes>,
93 pub size: usize,
94 pub global_old_value_size: LabelGuardedIntGauge,
95}
96
97impl Drop for SharedBufferBatchOldValues {
98 fn drop(&mut self) {
99 self.global_old_value_size.sub(self.size as _);
100 }
101}
102
103impl SharedBufferBatchOldValues {
104 pub(crate) fn new(
105 values: Vec<Bytes>,
106 size: usize,
107 global_old_value_size: LabelGuardedIntGauge,
108 ) -> Self {
109 global_old_value_size.add(size as _);
110 Self {
111 values,
112 size,
113 global_old_value_size,
114 }
115 }
116
117 pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
118 Self::new(values, size, LabelGuardedIntGauge::test_int_gauge::<1>())
119 }
120}
121
122#[derive(Debug)]
123pub(crate) struct SharedBufferBatchInner {
124 entries: Vec<SharedBufferEntry>,
125 old_values: Option<SharedBufferBatchOldValues>,
126 epoch_with_gap: EpochWithGap,
127 size: usize,
129 per_table_tracker: Arc<TableMemoryMetrics>,
130 batch_id: SharedBufferBatchId,
133}
134
135impl SharedBufferBatchInner {
136 pub(crate) fn new(
137 epoch: HummockEpoch,
138 spill_offset: u16,
139 payload: Vec<SharedBufferItem>,
140 old_values: Option<SharedBufferBatchOldValues>,
141 size: usize,
142 table_metrics: Arc<TableMemoryMetrics>,
143 ) -> Self {
144 assert!(!payload.is_empty());
145 debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
146 if let Some(old_values) = &old_values {
147 assert_eq!(old_values.values.len(), payload.len());
148 }
149
150 let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
151 let entries = payload
152 .into_iter()
153 .map(|(key, value)| SharedBufferEntry { key, value })
154 .collect();
155
156 let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
157
158 table_metrics.inc_imm(size);
159
160 SharedBufferBatchInner {
161 entries,
162 old_values,
163 epoch_with_gap,
164 size,
165 per_table_tracker: table_metrics,
166 batch_id,
167 }
168 }
169
170 fn get_value<'a>(
173 &'a self,
174 table_key: TableKey<&[u8]>,
175 read_epoch: HummockEpoch,
176 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
177 if let Ok(i) = self
179 .entries
180 .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
181 {
182 let SharedBufferEntry { key, value } = &self.entries[i];
183 debug_assert_eq!(key.as_ref(), *table_key);
184 if read_epoch >= self.epoch_with_gap.pure_epoch() {
185 return Some((value.to_ref().into(), self.epoch_with_gap));
186 }
187 }
188
189 None
190 }
191}
192
193impl Drop for SharedBufferBatchInner {
194 fn drop(&mut self) {
195 self.per_table_tracker.dec_imm(self.size);
196 }
197}
198
199pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
200 LazyLock::new(|| AtomicU64::new(0));
201
202#[derive(Clone, Debug)]
204pub struct SharedBufferBatch {
205 pub(crate) inner: Arc<SharedBufferBatchInner>,
206 pub table_id: TableId,
207}
208
209impl SharedBufferBatch {
210 pub fn for_test(
211 sorted_items: Vec<SharedBufferItem>,
212 epoch: HummockEpoch,
213 table_id: TableId,
214 ) -> Self {
215 Self::for_test_inner(sorted_items, None, epoch, table_id)
216 }
217
218 pub fn for_test_with_old_values(
219 sorted_items: Vec<SharedBufferItem>,
220 old_values: Vec<Bytes>,
221 epoch: HummockEpoch,
222 table_id: TableId,
223 ) -> Self {
224 Self::for_test_inner(sorted_items, Some(old_values), epoch, table_id)
225 }
226
227 fn for_test_inner(
228 sorted_items: Vec<SharedBufferItem>,
229 old_values: Option<Vec<Bytes>>,
230 epoch: HummockEpoch,
231 table_id: TableId,
232 ) -> Self {
233 let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());
234
235 let old_values = old_values
236 .map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));
237
238 Self {
239 inner: Arc::new(SharedBufferBatchInner::new(
240 epoch,
241 0,
242 sorted_items,
243 old_values,
244 size,
245 TableMemoryMetrics::for_test(),
246 )),
247 table_id,
248 }
249 }
250
251 pub fn measure_delete_range_size(batch_items: &[(Bound<Bytes>, Bound<Bytes>)]) -> usize {
252 batch_items
253 .iter()
254 .map(|(left, right)| {
255 let l1 = match left {
257 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
258 Bound::Unbounded => 13,
259 };
260 let l2 = match right {
261 Bound::Excluded(x) | Bound::Included(x) => x.len() + 13,
262 Bound::Unbounded => 13,
263 };
264 l1 + l2
265 })
266 .sum()
267 }
268
269 pub fn measure_batch_size(
271 batch_items: &[SharedBufferItem],
272 old_values: Option<&[Bytes]>,
273 ) -> (usize, usize) {
274 let old_value_size = old_values
275 .iter()
276 .flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
277 .sum::<usize>();
278 let kv_size = batch_items
280 .iter()
281 .map(|(k, v)| {
282 k.len() + {
283 match v {
284 SharedBufferValue::Insert(val) | SharedBufferValue::Update(val) => {
285 val.len()
286 }
287 SharedBufferValue::Delete => 0,
288 }
289 }
290 })
291 .sum::<usize>();
292 (kv_size + old_value_size, old_value_size)
293 }
294
295 pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
296 where
297 R: RangeBounds<TableKey<B>>,
298 B: AsRef<[u8]>,
299 {
300 let left = table_key_range
301 .start_bound()
302 .as_ref()
303 .map(|key| TableKey(key.0.as_ref()));
304 let right = table_key_range
305 .end_bound()
306 .as_ref()
307 .map(|key| TableKey(key.0.as_ref()));
308 self.table_id == table_id
309 && range_overlap(
310 &(left, right),
311 &self.start_table_key(),
312 Included(&self.end_table_key()),
313 )
314 }
315
316 pub fn table_id(&self) -> TableId {
317 self.table_id
318 }
319
320 pub fn key_count(&self) -> usize {
321 self.inner.entries.len()
322 }
323
324 pub fn value_count(&self) -> usize {
325 self.inner.entries.len()
326 }
327
328 pub fn has_old_value(&self) -> bool {
329 self.inner.old_values.is_some()
330 }
331
332 pub fn get<'a>(
333 &'a self,
334 table_key: TableKey<&[u8]>,
335 read_epoch: HummockEpoch,
336 _read_options: &ReadOptions,
337 ) -> Option<(HummockValue<&'a Bytes>, EpochWithGap)> {
338 self.inner.get_value(table_key, read_epoch)
339 }
340
341 pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
342 self.inner
343 .entries
344 .binary_search_by(|m| {
345 let key = &m.key;
346 let too_left = match &table_key_range.0 {
347 std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
348 std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
349 std::ops::Bound::Unbounded => false,
350 };
351 if too_left {
352 return Ordering::Less;
353 }
354
355 let too_right = match &table_key_range.1 {
356 std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
357 std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
358 std::ops::Bound::Unbounded => false,
359 };
360 if too_right {
361 return Ordering::Greater;
362 }
363
364 Ordering::Equal
365 })
366 .is_ok()
367 }
368
369 pub fn into_directed_iter<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>(
370 self,
371 ) -> SharedBufferBatchIterator<D, IS_NEW_VALUE> {
372 SharedBufferBatchIterator::<D, IS_NEW_VALUE>::new(self.inner, self.table_id)
373 }
374
375 pub fn into_old_value_iter(self) -> SharedBufferBatchIterator<Forward, false> {
376 self.into_directed_iter()
377 }
378
379 pub fn into_forward_iter(self) -> SharedBufferBatchIterator<Forward> {
380 self.into_directed_iter()
381 }
382
383 pub fn into_backward_iter(self) -> SharedBufferBatchIterator<Backward> {
384 self.into_directed_iter()
385 }
386
387 #[inline(always)]
388 pub fn start_table_key(&self) -> TableKey<&[u8]> {
389 TableKey(self.inner.entries.first().expect("non-empty").key.as_ref())
390 }
391
392 #[inline(always)]
393 pub fn end_table_key(&self) -> TableKey<&[u8]> {
394 TableKey(self.inner.entries.last().expect("non-empty").key.as_ref())
395 }
396
397 #[inline(always)]
398 pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
399 &self.inner.entries.last().expect("non-empty").key
400 }
401
402 pub fn start_user_key(&self) -> UserKey<&[u8]> {
405 UserKey::new(self.table_id, self.start_table_key())
406 }
407
408 pub fn size(&self) -> usize {
409 self.inner.size
410 }
411
412 pub fn batch_id(&self) -> SharedBufferBatchId {
413 self.inner.batch_id
414 }
415
416 pub fn epoch(&self) -> HummockEpoch {
417 self.inner.epoch_with_gap.pure_epoch()
418 }
419
420 pub(crate) fn build_shared_buffer_batch(
421 epoch: HummockEpoch,
422 spill_offset: u16,
423 sorted_items: Vec<SharedBufferItem>,
424 old_values: Option<SharedBufferBatchOldValues>,
425 size: usize,
426 table_id: TableId,
427 table_metrics: Arc<TableMemoryMetrics>,
428 ) -> Self {
429 let inner = SharedBufferBatchInner::new(
430 epoch,
431 spill_offset,
432 sorted_items,
433 old_values,
434 size,
435 table_metrics,
436 );
437 SharedBufferBatch {
438 inner: Arc::new(inner),
439 table_id,
440 }
441 }
442
443 #[cfg(any(test, feature = "test"))]
444 pub fn build_shared_buffer_batch_for_test(
445 epoch: HummockEpoch,
446 spill_offset: u16,
447 sorted_items: Vec<SharedBufferItem>,
448 size: usize,
449 table_id: TableId,
450 ) -> Self {
451 let inner = SharedBufferBatchInner::new(
452 epoch,
453 spill_offset,
454 sorted_items,
455 None,
456 size,
457 TableMemoryMetrics::for_test(),
458 );
459 SharedBufferBatch {
460 inner: Arc::new(inner),
461 table_id,
462 }
463 }
464}
465
466pub struct SharedBufferBatchIterator<D: HummockIteratorDirection, const IS_NEW_VALUE: bool = true> {
469 inner: Arc<SharedBufferBatchInner>,
470 current_entry_idx: usize,
472 table_id: TableId,
473 _phantom: PhantomData<D>,
474}
475
476impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool>
477 SharedBufferBatchIterator<D, IS_NEW_VALUE>
478{
479 pub(crate) fn new(inner: Arc<SharedBufferBatchInner>, table_id: TableId) -> Self {
480 if !IS_NEW_VALUE {
481 assert!(
482 inner.old_values.is_some(),
483 "create old value iter with no old value: {:?}",
484 table_id
485 );
486 }
487 Self {
488 inner,
489 current_entry_idx: 0,
490 table_id,
491 _phantom: Default::default(),
492 }
493 }
494
495 fn is_valid_entry_idx(&self) -> bool {
496 self.current_entry_idx < self.inner.entries.len()
497 }
498
499 fn invalidate(&mut self) {
500 self.current_entry_idx = self.inner.entries.len();
501 }
502
503 fn advance_to_next_entry(&mut self) {
504 debug_assert!(self.is_valid_entry_idx());
505 match D::direction() {
506 DirectionEnum::Forward => {
507 self.current_entry_idx += 1;
508 }
509 DirectionEnum::Backward => {
510 if self.current_entry_idx == 0 {
511 self.invalidate();
512 } else {
513 self.current_entry_idx -= 1;
514 }
515 }
516 }
517 }
518
519 fn assert_valid_idx(&self) {
520 debug_assert!(self.is_valid_entry_idx());
521 if !IS_NEW_VALUE {
522 debug_assert!(!matches!(
523 self.inner.entries[self.current_entry_idx].value,
524 SharedBufferValue::Insert(_)
525 ));
526 }
527 }
528
529 fn advance_until_valid_old_value(&mut self) {
530 debug_assert!(!IS_NEW_VALUE);
531 while self.is_valid_entry_idx()
532 && matches!(
533 self.inner.entries[self.current_entry_idx].value,
534 SharedBufferValue::Insert(_)
535 )
536 {
537 self.advance_to_next_entry();
538 }
539 }
540}
541
542impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
543 for SharedBufferBatchIterator<D, IS_NEW_VALUE>
544{
545 type Direction = D;
546
547 async fn next(&mut self) -> HummockResult<()> {
548 self.advance_to_next_entry();
549 if !IS_NEW_VALUE {
550 self.advance_until_valid_old_value();
551 }
552 Ok(())
553 }
554
555 fn key(&self) -> FullKey<&[u8]> {
556 self.assert_valid_idx();
557 let entry = &self.inner.entries[self.current_entry_idx];
558 FullKey::new_with_gap_epoch(
559 self.table_id,
560 TableKey(entry.key.as_ref()),
561 self.inner.epoch_with_gap,
562 )
563 }
564
565 fn value(&self) -> HummockValue<&[u8]> {
566 self.assert_valid_idx();
567 if IS_NEW_VALUE {
568 self.inner.entries[self.current_entry_idx]
569 .value
570 .to_ref()
571 .to_slice()
572 .into()
573 } else {
574 HummockValue::put(
575 self.inner.old_values.as_ref().unwrap().values[self.current_entry_idx].as_ref(),
576 )
577 }
578 }
579
580 fn is_valid(&self) -> bool {
581 self.is_valid_entry_idx()
582 }
583
584 async fn rewind(&mut self) -> HummockResult<()> {
585 match D::direction() {
586 DirectionEnum::Forward => {
587 self.current_entry_idx = 0;
588 }
589 DirectionEnum::Backward => {
590 self.current_entry_idx = self.inner.entries.len() - 1;
591 }
592 };
593 if !IS_NEW_VALUE {
594 self.advance_until_valid_old_value();
595 }
596 Ok(())
597 }
598
599 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
600 match key.user_key.table_id.cmp(&self.table_id) {
601 Ordering::Less => {
602 match D::direction() {
603 DirectionEnum::Forward => {
604 self.rewind().await?;
606 return Ok(());
607 }
608 DirectionEnum::Backward => {
609 self.invalidate();
610 return Ok(());
611 }
612 };
613 }
614 Ordering::Greater => {
615 match D::direction() {
616 DirectionEnum::Forward => {
617 self.invalidate();
618 return Ok(());
619 }
620 DirectionEnum::Backward => {
621 self.rewind().await?;
623 return Ok(());
624 }
625 };
626 }
627 Ordering::Equal => (),
628 }
629 let partition_point = self
632 .inner
633 .entries
634 .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
635 match partition_point {
636 Ok(i) => {
637 self.current_entry_idx = i;
638 let skip_on_epoch = match D::direction() {
644 DirectionEnum::Forward => {
645 key.epoch_with_gap < self.inner.epoch_with_gap
647 }
648 DirectionEnum::Backward => {
649 key.epoch_with_gap > self.inner.epoch_with_gap
651 }
652 };
653 if skip_on_epoch {
654 self.advance_to_next_entry()
655 }
656 }
657 Err(i) => match D::direction() {
658 DirectionEnum::Forward => {
659 self.current_entry_idx = i;
660 }
661 DirectionEnum::Backward => {
662 if i == 0 {
663 self.invalidate();
664 } else {
665 self.current_entry_idx = i - 1;
666 }
667 }
668 },
669 };
670 if !IS_NEW_VALUE {
671 self.advance_until_valid_old_value();
672 }
673 Ok(())
674 }
675
676 fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {}
677
678 fn value_meta(&self) -> ValueMeta {
679 ValueMeta::default()
680 }
681}
682
683#[cfg(test)]
684mod tests {
685 use std::ops::Bound::Excluded;
686
687 use itertools::{Itertools, zip_eq};
688 use risingwave_common::util::epoch::{EpochExt, test_epoch};
689 use risingwave_hummock_sdk::key::map_table_key_range;
690
691 use super::*;
692 use crate::hummock::iterator::test_utils::{
693 iterator_test_key_of_epoch, iterator_test_table_key_of, transform_shared_buffer,
694 };
695
696 fn to_hummock_value_batch(
697 items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
698 ) -> Vec<(Vec<u8>, HummockValue<Bytes>)> {
699 items.into_iter().map(|(k, v)| (k, v.into())).collect()
700 }
701
702 #[tokio::test]
703 async fn test_shared_buffer_batch_basic() {
704 let epoch = test_epoch(1);
705 let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![
706 (
707 iterator_test_table_key_of(0),
708 SharedBufferValue::Insert(Bytes::from("value1")),
709 ),
710 (
711 iterator_test_table_key_of(1),
712 SharedBufferValue::Insert(Bytes::from("value1")),
713 ),
714 (
715 iterator_test_table_key_of(2),
716 SharedBufferValue::Insert(Bytes::from("value1")),
717 ),
718 ];
719 let shared_buffer_batch = SharedBufferBatch::for_test(
720 transform_shared_buffer(shared_buffer_items.clone()),
721 epoch,
722 Default::default(),
723 );
724 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
725
726 assert_eq!(
728 *shared_buffer_batch.start_table_key(),
729 shared_buffer_items[0].0
730 );
731 assert_eq!(
732 *shared_buffer_batch.end_table_key(),
733 shared_buffer_items[2].0
734 );
735
736 for (k, v) in &shared_buffer_items {
738 assert_eq!(
739 shared_buffer_batch
740 .get(TableKey(k.as_slice()), epoch, &ReadOptions::default())
741 .unwrap()
742 .0
743 .as_slice(),
744 v.as_slice()
745 );
746 }
747 assert_eq!(
748 shared_buffer_batch.get(
749 TableKey(iterator_test_table_key_of(3).as_slice()),
750 epoch,
751 &ReadOptions::default()
752 ),
753 None
754 );
755 assert_eq!(
756 shared_buffer_batch.get(
757 TableKey(iterator_test_table_key_of(4).as_slice()),
758 epoch,
759 &ReadOptions::default()
760 ),
761 None
762 );
763
764 let mut iter = shared_buffer_batch.clone().into_forward_iter();
766 iter.rewind().await.unwrap();
767 let mut output = vec![];
768 while iter.is_valid() {
769 output.push((
770 iter.key().user_key.table_key.to_vec(),
771 iter.value().to_bytes(),
772 ));
773 iter.next().await.unwrap();
774 }
775 assert_eq!(output, shared_buffer_items);
776
777 let mut backward_iter = shared_buffer_batch.clone().into_backward_iter();
779 backward_iter.rewind().await.unwrap();
780 let mut output = vec![];
781 while backward_iter.is_valid() {
782 output.push((
783 backward_iter.key().user_key.table_key.to_vec(),
784 backward_iter.value().to_bytes(),
785 ));
786 backward_iter.next().await.unwrap();
787 }
788 output.reverse();
789 assert_eq!(output, shared_buffer_items);
790 }
791
792 #[tokio::test]
793 async fn test_shared_buffer_batch_seek() {
794 let epoch = test_epoch(1);
795 let shared_buffer_items = vec![
796 (
797 iterator_test_table_key_of(1),
798 SharedBufferValue::Insert(Bytes::from("value1")),
799 ),
800 (
801 iterator_test_table_key_of(2),
802 SharedBufferValue::Insert(Bytes::from("value2")),
803 ),
804 (
805 iterator_test_table_key_of(3),
806 SharedBufferValue::Insert(Bytes::from("value3")),
807 ),
808 ];
809 let shared_buffer_batch = SharedBufferBatch::for_test(
810 transform_shared_buffer(shared_buffer_items.clone()),
811 epoch,
812 Default::default(),
813 );
814 let shared_buffer_items = to_hummock_value_batch(shared_buffer_items);
815
816 let mut iter = shared_buffer_batch.clone().into_forward_iter();
818 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
819 .await
820 .unwrap();
821 for item in &shared_buffer_items {
822 assert!(iter.is_valid());
823 assert_eq!(*iter.key().user_key.table_key, item.0);
824 assert_eq!(iter.value(), item.1.as_slice());
825 iter.next().await.unwrap();
826 }
827 assert!(!iter.is_valid());
828
829 let mut iter = shared_buffer_batch.clone().into_forward_iter();
831 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
832 .await
833 .unwrap();
834 assert!(!iter.is_valid());
835
836 let mut iter = shared_buffer_batch.clone().into_forward_iter();
838 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
839 .await
840 .unwrap();
841 for item in &shared_buffer_items[1..] {
842 assert!(iter.is_valid());
843 assert_eq!(*iter.key().user_key.table_key, item.0);
844 assert_eq!(iter.value(), item.1.as_slice());
845 iter.next().await.unwrap();
846 }
847 assert!(!iter.is_valid());
848
849 let mut iter = shared_buffer_batch.clone().into_forward_iter();
851 iter.seek(iterator_test_key_of_epoch(2, test_epoch(2)).to_ref())
852 .await
853 .unwrap();
854 for item in &shared_buffer_items[1..] {
855 assert!(iter.is_valid());
856 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
857 assert_eq!(iter.value(), item.1.as_slice());
858 iter.next().await.unwrap();
859 }
860 assert!(!iter.is_valid());
861
862 let mut iter = shared_buffer_batch.clone().into_forward_iter();
864 iter.seek(iterator_test_key_of_epoch(2, test_epoch(0)).to_ref())
865 .await
866 .unwrap();
867 let item = shared_buffer_items.last().unwrap();
868 assert!(iter.is_valid());
869 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
870 assert_eq!(iter.value(), item.1.as_slice());
871 iter.next().await.unwrap();
872 assert!(!iter.is_valid());
873
874 let mut iter = shared_buffer_batch.clone().into_backward_iter();
876 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
877 .await
878 .unwrap();
879 assert!(!iter.is_valid());
880
881 let mut iter = shared_buffer_batch.clone().into_backward_iter();
883 iter.seek(iterator_test_key_of_epoch(4, epoch).to_ref())
884 .await
885 .unwrap();
886 for item in shared_buffer_items.iter().rev() {
887 assert!(iter.is_valid());
888 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
889 assert_eq!(iter.value(), item.1.as_slice());
890 iter.next().await.unwrap();
891 }
892 assert!(!iter.is_valid());
893
894 let mut iter = shared_buffer_batch.clone().into_backward_iter();
896 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
897 .await
898 .unwrap();
899 for item in shared_buffer_items[0..=1].iter().rev() {
900 assert!(iter.is_valid());
901 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
902 assert_eq!(iter.value(), item.1.as_slice());
903 iter.next().await.unwrap();
904 }
905 assert!(!iter.is_valid());
906
907 let mut iter = shared_buffer_batch.clone().into_backward_iter();
909 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
910 .await
911 .unwrap();
912 for item in shared_buffer_items[0..=1].iter().rev() {
913 assert!(iter.is_valid());
914 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
915 assert_eq!(iter.value(), item.1.as_slice());
916 iter.next().await.unwrap();
917 }
918 assert!(!iter.is_valid());
919
920 let mut iter = shared_buffer_batch.clone().into_backward_iter();
922 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
923 .await
924 .unwrap();
925 assert!(iter.is_valid());
926 let item = shared_buffer_items.first().unwrap();
927 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
928 assert_eq!(iter.value(), item.1.as_slice());
929 iter.next().await.unwrap();
930 assert!(!iter.is_valid());
931 }
932
933 #[tokio::test]
934 async fn test_shared_buffer_batch_old_value_iter() {
935 let epoch = test_epoch(1);
936 let key_values = vec![
937 (
938 iterator_test_table_key_of(1),
939 SharedBufferValue::Insert(Bytes::from("value1")),
940 ),
941 (
942 iterator_test_table_key_of(2),
943 SharedBufferValue::Update(Bytes::from("value2")),
944 ),
945 (
946 iterator_test_table_key_of(3),
947 SharedBufferValue::Insert(Bytes::from("value3")),
948 ),
949 (iterator_test_table_key_of(4), SharedBufferValue::Delete),
950 ];
951 let old_values = vec![
952 Bytes::new(),
953 Bytes::from("old_value2"),
954 Bytes::new(),
955 Bytes::from("old_value4"),
956 ];
957 let shared_buffer_batch = SharedBufferBatch::for_test_with_old_values(
958 transform_shared_buffer(key_values.clone()),
959 old_values.clone(),
960 epoch,
961 Default::default(),
962 );
963 let shared_buffer_items = to_hummock_value_batch(key_values.clone());
964 let expected_old_value_iter_items = zip_eq(&key_values, &old_values)
965 .filter(|((_, new_value), _)| !matches!(new_value, SharedBufferValue::Insert(_)))
966 .map(|((key, _), old_value)| (key.clone(), HummockValue::Put(old_value)))
967 .collect_vec();
968
969 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
970 iter.rewind().await.unwrap();
971 for item in &expected_old_value_iter_items {
972 assert!(iter.is_valid());
973 assert_eq!(*iter.key().user_key.table_key, item.0);
974 assert_eq!(iter.value(), item.1.as_slice());
975 iter.next().await.unwrap();
976 }
977 assert!(!iter.is_valid());
978
979 let mut iter = shared_buffer_batch.clone().into_forward_iter();
981 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
982 .await
983 .unwrap();
984 for item in &shared_buffer_items {
985 assert!(iter.is_valid());
986 assert_eq!(*iter.key().user_key.table_key, item.0);
987 assert_eq!(iter.value(), item.1.as_slice());
988 iter.next().await.unwrap();
989 }
990 assert!(!iter.is_valid());
991
992 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
993 iter.seek(iterator_test_key_of_epoch(0, epoch).to_ref())
994 .await
995 .unwrap();
996 for item in &expected_old_value_iter_items {
997 assert!(iter.is_valid());
998 assert_eq!(*iter.key().user_key.table_key, item.0);
999 assert_eq!(iter.value(), item.1.as_slice());
1000 iter.next().await.unwrap();
1001 }
1002 assert!(!iter.is_valid());
1003
1004 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1006 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1007 .await
1008 .unwrap();
1009 assert!(!iter.is_valid());
1010
1011 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1012 iter.seek(iterator_test_key_of_epoch(5, epoch).to_ref())
1013 .await
1014 .unwrap();
1015 assert!(!iter.is_valid());
1016
1017 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1019 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1020 .await
1021 .unwrap();
1022 for item in &shared_buffer_items[1..] {
1023 assert!(iter.is_valid());
1024 assert_eq!(*iter.key().user_key.table_key, item.0);
1025 assert_eq!(iter.value(), item.1.as_slice());
1026 iter.next().await.unwrap();
1027 }
1028 assert!(!iter.is_valid());
1029
1030 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1031 iter.seek(iterator_test_key_of_epoch(2, epoch).to_ref())
1032 .await
1033 .unwrap();
1034 for item in &expected_old_value_iter_items {
1035 assert!(iter.is_valid());
1036 assert_eq!(*iter.key().user_key.table_key, item.0);
1037 assert_eq!(iter.value(), item.1.as_slice());
1038 iter.next().await.unwrap();
1039 }
1040 assert!(!iter.is_valid());
1041
1042 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1044 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1045 .await
1046 .unwrap();
1047 for item in &shared_buffer_items[1..] {
1048 assert!(iter.is_valid());
1049 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1050 assert_eq!(iter.value(), item.1.as_slice());
1051 iter.next().await.unwrap();
1052 }
1053 assert!(!iter.is_valid());
1054
1055 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1056 iter.seek(iterator_test_key_of_epoch(2, epoch.next_epoch()).to_ref())
1057 .await
1058 .unwrap();
1059 for item in &expected_old_value_iter_items {
1060 assert!(iter.is_valid());
1061 assert_eq!(*iter.key().user_key.table_key, item.0);
1062 assert_eq!(iter.value(), item.1.as_slice());
1063 iter.next().await.unwrap();
1064 }
1065 assert!(!iter.is_valid());
1066
1067 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1068 iter.seek(iterator_test_key_of_epoch(2, epoch.prev_epoch()).to_ref())
1069 .await
1070 .unwrap();
1071 for item in &expected_old_value_iter_items[1..] {
1072 assert!(iter.is_valid());
1073 assert_eq!(*iter.key().user_key.table_key, item.0);
1074 assert_eq!(iter.value(), item.1.as_slice());
1075 iter.next().await.unwrap();
1076 }
1077 assert!(!iter.is_valid());
1078
1079 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1081 iter.seek(iterator_test_key_of_epoch(3, epoch.prev_epoch()).to_ref())
1082 .await
1083 .unwrap();
1084 let item = shared_buffer_items.last().unwrap();
1085 assert!(iter.is_valid());
1086 assert_eq!(*iter.key().user_key.table_key, item.0.as_slice());
1087 assert_eq!(iter.value(), item.1.as_slice());
1088 iter.next().await.unwrap();
1089 assert!(!iter.is_valid());
1090
1091 let mut iter = shared_buffer_batch.clone().into_old_value_iter();
1093 iter.seek(iterator_test_key_of_epoch(3, epoch).to_ref())
1094 .await
1095 .unwrap();
1096 for item in &expected_old_value_iter_items[1..] {
1097 assert!(iter.is_valid());
1098 assert_eq!(*iter.key().user_key.table_key, item.0);
1099 assert_eq!(iter.value(), item.1.as_slice());
1100 iter.next().await.unwrap();
1101 }
1102 assert!(!iter.is_valid());
1103 }
1104
1105 #[tokio::test]
1106 #[should_panic]
1107 async fn test_invalid_table_id() {
1108 let epoch = test_epoch(1);
1109 let shared_buffer_batch = SharedBufferBatch::for_test(vec![], epoch, Default::default());
1110 let mut iter = shared_buffer_batch.into_forward_iter();
1112 iter.seek(FullKey::for_test(TableId::new(1), vec![], epoch).to_ref())
1113 .await
1114 .unwrap();
1115 }
1116
1117 #[tokio::test]
1118 async fn test_shared_buffer_batch_range_existx() {
1119 let epoch = test_epoch(1);
1120 let shared_buffer_items = vec![
1121 (
1122 Vec::from("a_1"),
1123 SharedBufferValue::Insert(Bytes::from("value1")),
1124 ),
1125 (
1126 Vec::from("a_3"),
1127 SharedBufferValue::Insert(Bytes::from("value2")),
1128 ),
1129 (
1130 Vec::from("a_5"),
1131 SharedBufferValue::Insert(Bytes::from("value3")),
1132 ),
1133 (
1134 Vec::from("b_2"),
1135 SharedBufferValue::Insert(Bytes::from("value3")),
1136 ),
1137 ];
1138 let shared_buffer_batch = SharedBufferBatch::for_test(
1139 transform_shared_buffer(shared_buffer_items),
1140 epoch,
1141 Default::default(),
1142 );
1143
1144 let range = (Included(Bytes::from("a")), Excluded(Bytes::from("b")));
1145 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1146 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("b_")));
1147 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1148 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_1")));
1149 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1150 let range = (Included(Bytes::from("a_1")), Included(Bytes::from("a_2")));
1151 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1152 let range = (Included(Bytes::from("a_0x")), Included(Bytes::from("a_2x")));
1153 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1154 let range = (Included(Bytes::from("a_")), Excluded(Bytes::from("c_")));
1155 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1156 let range = (Included(Bytes::from("b_0x")), Included(Bytes::from("b_2x")));
1157 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1158 let range = (Included(Bytes::from("b_2")), Excluded(Bytes::from("c_1x")));
1159 assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
1160
1161 let range = (Included(Bytes::from("a_0")), Excluded(Bytes::from("a_1")));
1162 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1163 let range = (Included(Bytes::from("a__0")), Excluded(Bytes::from("a__5")));
1164 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1165 let range = (Included(Bytes::from("b_1")), Excluded(Bytes::from("b_2")));
1166 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1167 let range = (Included(Bytes::from("b_3")), Excluded(Bytes::from("c_1")));
1168 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1169 let range = (Included(Bytes::from("b__x")), Excluded(Bytes::from("c__x")));
1170 assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
1171 }
1172
1173 #[tokio::test]
1174 async fn test_shared_buffer_batch_seek_bug() {
1175 let epoch = test_epoch(1);
1177 let table_id = TableId::new(100);
1178 let shared_buffer_items: Vec<(Vec<u8>, SharedBufferValue<Bytes>)> = vec![(
1179 iterator_test_table_key_of(1), SharedBufferValue::Insert(Bytes::from("value1")),
1181 )];
1182 let shared_buffer_batch = SharedBufferBatch::for_test(
1183 transform_shared_buffer(shared_buffer_items.clone()),
1184 epoch,
1185 table_id,
1186 );
1187
1188 let mut iter = shared_buffer_batch.clone().into_forward_iter();
1195 let seek_key = FullKey::for_test(
1196 TableId::new(99),
1197 iterator_test_table_key_of(2), epoch,
1199 );
1200 iter.seek(seek_key.to_ref()).await.unwrap();
1201
1202 assert!(
1203 iter.is_valid(),
1204 "Iterator should be valid when seeking with smaller table_id, even if the key part is larger"
1205 );
1206 assert_eq!(iter.key().user_key.table_id, table_id);
1207
1208 let mut iter = shared_buffer_batch.clone().into_backward_iter();
1215 let seek_key = FullKey::for_test(
1216 TableId::new(101),
1217 iterator_test_table_key_of(0), epoch,
1219 );
1220 iter.seek(seek_key.to_ref()).await.unwrap();
1221
1222 assert!(
1223 iter.is_valid(),
1224 "Iterator should be valid when seeking with larger table_id, even if the key part is smaller"
1225 );
1226 assert_eq!(iter.key().user_key.table_id, table_id);
1227 }
1228}