1use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::Included;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54 filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55 search_sst_idx,
56};
57use crate::hummock::{
58 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
59 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
60 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
61 hit_sstable_bloom_filter,
62};
63use crate::mem_table::{
64 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
65};
66use crate::monitor::{
67 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
68};
69use crate::store::{
70 OnNearestItemFn, ReadLogOptions, ReadOptions, Vector, VectorNearestOptions, gen_min_epoch,
71};
72use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
73
74pub type CommittedVersion = PinnedVersion;
75
76#[derive(Clone, Debug, PartialEq)]
82pub struct StagingSstableInfo {
83 sstable_infos: Vec<LocalSstableInfo>,
85 old_value_sstable_infos: Vec<LocalSstableInfo>,
86 epochs: Vec<HummockEpoch>,
89 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
91 imm_size: usize,
92}
93
94impl StagingSstableInfo {
95 pub fn new(
96 sstable_infos: Vec<LocalSstableInfo>,
97 old_value_sstable_infos: Vec<LocalSstableInfo>,
98 epochs: Vec<HummockEpoch>,
99 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
100 imm_size: usize,
101 ) -> Self {
102 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
104 Self {
105 sstable_infos,
106 old_value_sstable_infos,
107 epochs,
108 imm_ids,
109 imm_size,
110 }
111 }
112
113 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
114 &self.sstable_infos
115 }
116
117 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
118 &self.old_value_sstable_infos
119 }
120
121 pub fn imm_size(&self) -> usize {
122 self.imm_size
123 }
124
125 pub fn epochs(&self) -> &Vec<HummockEpoch> {
126 &self.epochs
127 }
128
129 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
130 &self.imm_ids
131 }
132}
133
134pub enum VersionUpdate {
135 Sst(Arc<StagingSstableInfo>),
136 CommittedSnapshot(CommittedVersion),
137 NewTableWatermark {
138 direction: WatermarkDirection,
139 epoch: HummockEpoch,
140 vnode_watermarks: Vec<VnodeWatermark>,
141 watermark_type: WatermarkSerdeType,
142 },
143}
144
145#[derive(Clone)]
146pub struct StagingVersion {
147 pending_imm_size: usize,
148 pub pending_imms: Vec<ImmutableMemtable>,
155 pub uploading_imms: VecDeque<ImmutableMemtable>,
161
162 pub sst: VecDeque<Arc<StagingSstableInfo>>,
164}
165
166impl StagingVersion {
167 pub fn prune_overlap<'a>(
170 &'a self,
171 max_epoch_inclusive: HummockEpoch,
172 table_id: TableId,
173 table_key_range: &'a TableKeyRange,
174 ) -> (
175 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
176 impl Iterator<Item = &'a SstableInfo> + 'a,
177 ) {
178 let (left, right) = table_key_range;
179 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
180 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
181 let overlapped_imms = self
182 .pending_imms
183 .iter()
184 .rev() .chain(self.uploading_imms.iter())
186 .filter(move |imm| {
187 imm.min_epoch() <= max_epoch_inclusive
189 && imm.table_id == table_id
190 && range_overlap(
191 &(left, right),
192 &imm.start_table_key(),
193 Included(&imm.end_table_key()),
194 )
195 });
196
197 let overlapped_ssts = self
199 .sst
200 .iter()
201 .filter(move |staging_sst| {
202 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
203 sst_max_epoch <= max_epoch_inclusive
204 })
205 .flat_map(move |staging_sst| {
206 staging_sst
209 .sstable_infos
210 .iter()
211 .map(|sstable| &sstable.sst_info)
212 .filter(move |sstable: &&SstableInfo| {
213 filter_single_sst(sstable, table_id, table_key_range)
214 })
215 });
216 (overlapped_imms, overlapped_ssts)
217 }
218
219 pub fn is_empty(&self) -> bool {
220 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
221 }
222}
223
224#[derive(Clone)]
225pub struct HummockReadVersion {
227 table_id: TableId,
228 instance_id: LocalInstanceId,
229
230 staging: StagingVersion,
232
233 committed: CommittedVersion,
235
236 is_replicated: bool,
241
242 table_watermarks: Option<PkPrefixTableWatermarksIndex>,
243
244 vnodes: Arc<Bitmap>,
247}
248
249impl HummockReadVersion {
250 pub fn new_with_replication_option(
251 table_id: TableId,
252 instance_id: LocalInstanceId,
253 committed_version: CommittedVersion,
254 is_replicated: bool,
255 vnodes: Arc<Bitmap>,
256 ) -> Self {
257 assert!(committed_version.is_valid());
261 Self {
262 table_id,
263 instance_id,
264 table_watermarks: {
265 match committed_version.table_watermarks.get(&table_id) {
266 Some(table_watermarks) => match table_watermarks.watermark_type {
267 WatermarkSerdeType::PkPrefix => {
268 Some(PkPrefixTableWatermarksIndex::new_committed(
269 table_watermarks.clone(),
270 committed_version
271 .state_table_info
272 .info()
273 .get(&table_id)
274 .expect("should exist")
275 .committed_epoch,
276 ))
277 }
278
279 WatermarkSerdeType::NonPkPrefix => None, },
281 None => None,
282 }
283 },
284 staging: StagingVersion {
285 pending_imm_size: 0,
286 pending_imms: Vec::default(),
287 uploading_imms: VecDeque::default(),
288 sst: VecDeque::default(),
289 },
290
291 committed: committed_version,
292
293 is_replicated,
294 vnodes,
295 }
296 }
297
298 pub fn new(
299 table_id: TableId,
300 instance_id: LocalInstanceId,
301 committed_version: CommittedVersion,
302 vnodes: Arc<Bitmap>,
303 ) -> Self {
304 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
305 }
306
307 pub fn table_id(&self) -> TableId {
308 self.table_id
309 }
310
311 pub fn add_imm(&mut self, imm: ImmutableMemtable) {
312 if let Some(item) = self
313 .staging
314 .pending_imms
315 .last()
316 .or_else(|| self.staging.uploading_imms.front())
317 {
318 debug_assert!(item.batch_id() < imm.batch_id());
320 }
321
322 self.staging.pending_imm_size += imm.size();
323 self.staging.pending_imms.push(imm);
324 }
325
326 pub fn pending_imm_size(&self) -> usize {
327 self.staging.pending_imm_size
328 }
329
330 pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
331 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
332 for imm in &pending_imms {
333 self.staging.uploading_imms.push_front(imm.clone());
334 }
335 self.staging.pending_imm_size = 0;
336 pending_imms
337 }
338
339 pub fn update(&mut self, info: VersionUpdate) {
349 match info {
350 VersionUpdate::Sst(staging_sst_ref) => {
351 {
352 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
353 warn!(
354 instance_id = self.instance_id,
355 "no related imm in sst input"
356 );
357 return;
358 };
359
360 for imm_id in imms.iter().rev() {
362 let check_err = match self.staging.uploading_imms.pop_back() {
363 None => Some("empty".to_owned()),
364 Some(prev_imm_id) => {
365 if prev_imm_id.batch_id() == *imm_id {
366 None
367 } else {
368 Some(format!(
369 "miss match id {} {}",
370 prev_imm_id.batch_id(),
371 *imm_id
372 ))
373 }
374 }
375 };
376 assert!(
377 check_err.is_none(),
378 "should be valid staging_sst.size {},
379 staging_sst.imm_ids {:?},
380 staging_sst.epochs {:?},
381 local_pending_imm_ids {:?},
382 local_uploading_imm_ids {:?},
383 instance_id {}
384 check_err {:?}",
385 staging_sst_ref.imm_size,
386 staging_sst_ref.imm_ids,
387 staging_sst_ref.epochs,
388 self.staging
389 .pending_imms
390 .iter()
391 .map(|imm| imm.batch_id())
392 .collect_vec(),
393 self.staging
394 .uploading_imms
395 .iter()
396 .map(|imm| imm.batch_id())
397 .collect_vec(),
398 self.instance_id,
399 check_err
400 );
401 }
402
403 self.staging.sst.push_front(staging_sst_ref);
404 }
405 }
406
407 VersionUpdate::CommittedSnapshot(committed_version) => {
408 if let Some(info) = committed_version
409 .state_table_info
410 .info()
411 .get(&self.table_id)
412 {
413 let committed_epoch = info.committed_epoch;
414 if self.is_replicated {
415 self.staging
416 .uploading_imms
417 .retain(|imm| imm.min_epoch() > committed_epoch);
418 self.staging
419 .pending_imms
420 .retain(|imm| imm.min_epoch() > committed_epoch);
421 } else {
422 self.staging
423 .pending_imms
424 .iter()
425 .chain(self.staging.uploading_imms.iter())
426 .for_each(|imm| {
427 assert!(
428 imm.min_epoch() > committed_epoch,
429 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
430 imm.table_id,
431 imm.min_epoch(),
432 committed_epoch
433 )
434 });
435 }
436
437 self.staging.sst.retain(|sst| {
438 sst.epochs.first().expect("epochs not empty") > &committed_epoch
439 });
440
441 assert!(self.staging.sst.iter().all(|sst| {
443 sst.epochs.last().expect("epochs not empty") > &committed_epoch
444 }));
445
446 if let Some(committed_watermarks) =
447 committed_version.table_watermarks.get(&self.table_id)
448 && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
449 {
450 if let Some(watermark_index) = &mut self.table_watermarks {
451 watermark_index.apply_committed_watermarks(
452 committed_watermarks.clone(),
453 committed_epoch,
454 );
455 } else {
456 self.table_watermarks =
457 Some(PkPrefixTableWatermarksIndex::new_committed(
458 committed_watermarks.clone(),
459 committed_epoch,
460 ));
461 }
462 }
463 }
464
465 self.committed = committed_version;
466 }
467 VersionUpdate::NewTableWatermark {
468 direction,
469 epoch,
470 vnode_watermarks,
471 watermark_type,
472 } => {
473 assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
474 if let Some(watermark_index) = &mut self.table_watermarks {
475 watermark_index.add_epoch_watermark(
476 epoch,
477 Arc::from(vnode_watermarks),
478 direction,
479 );
480 } else {
481 self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
482 direction,
483 epoch,
484 vnode_watermarks,
485 self.committed.table_committed_epoch(self.table_id),
486 ));
487 }
488 }
489 }
490 }
491
492 pub fn staging(&self) -> &StagingVersion {
493 &self.staging
494 }
495
496 pub fn committed(&self) -> &CommittedVersion {
497 &self.committed
498 }
499
500 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
505 if let Some(watermark_index) = &self.table_watermarks {
506 watermark_index.filter_regress_watermarks(watermarks)
507 }
508 }
509
510 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
511 self.table_watermarks
512 .as_ref()
513 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
514 }
515
516 pub fn is_replicated(&self) -> bool {
517 self.is_replicated
518 }
519
520 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
521 std::mem::replace(&mut self.vnodes, vnodes)
522 }
523
524 pub fn contains(&self, vnode: VirtualNode) -> bool {
525 self.vnodes.is_set(vnode.to_index())
526 }
527
528 pub fn vnodes(&self) -> Arc<Bitmap> {
529 self.vnodes.clone()
530 }
531}
532
533pub fn read_filter_for_version(
534 epoch: HummockEpoch,
535 table_id: TableId,
536 mut table_key_range: TableKeyRange,
537 read_version: &RwLock<HummockReadVersion>,
538) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
539 let read_version_guard = read_version.read();
540
541 let committed_version = read_version_guard.committed().clone();
542
543 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
544 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
545 }
546
547 let (imm_iter, sst_iter) =
548 read_version_guard
549 .staging()
550 .prune_overlap(epoch, table_id, &table_key_range);
551
552 let imms = imm_iter.cloned().collect();
553 let ssts = sst_iter.cloned().collect();
554
555 Ok((table_key_range, (imms, ssts, committed_version)))
556}
557
558#[derive(Clone)]
559pub struct HummockVersionReader {
560 sstable_store: SstableStoreRef,
561
562 state_store_metrics: Arc<HummockStateStoreMetrics>,
564 preload_retry_times: usize,
565}
566
567impl HummockVersionReader {
570 pub fn new(
571 sstable_store: SstableStoreRef,
572 state_store_metrics: Arc<HummockStateStoreMetrics>,
573 preload_retry_times: usize,
574 ) -> Self {
575 Self {
576 sstable_store,
577 state_store_metrics,
578 preload_retry_times,
579 }
580 }
581
582 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
583 &self.state_store_metrics
584 }
585}
586
587const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
588
589impl HummockVersionReader {
590 pub async fn get<O>(
591 &self,
592 table_key: TableKey<Bytes>,
593 epoch: u64,
594 table_id: TableId,
595 read_options: ReadOptions,
596 read_version_tuple: ReadVersionTuple,
597 on_key_value_fn: impl crate::store::KeyValueFn<O>,
598 ) -> StorageResult<Option<O>> {
599 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
600
601 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
602 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
603 let local_stats = &mut stats_guard.local_stats;
604 local_stats.found_key = true;
605
606 for imm in &imms {
608 if imm.max_epoch() < min_epoch {
610 continue;
611 }
612
613 local_stats.staging_imm_get_count += 1;
614
615 if let Some((data, data_epoch)) = get_from_batch(
616 imm,
617 TableKey(table_key.as_ref()),
618 epoch,
619 &read_options,
620 local_stats,
621 ) {
622 return Ok(if data_epoch.pure_epoch() < min_epoch {
623 None
624 } else {
625 data.into_user_value()
626 .map(|v| {
627 on_key_value_fn(
628 FullKey::new_with_gap_epoch(
629 table_id,
630 table_key.to_ref(),
631 data_epoch,
632 ),
633 v.as_ref(),
634 )
635 })
636 .transpose()?
637 });
638 }
639 }
640
641 let dist_key_hash = read_options
643 .prefix_hint
644 .as_ref()
645 .map(|dist_key| Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.table_id()));
646
647 let full_key = FullKey::new_with_gap_epoch(
650 table_id,
651 TableKey(table_key.clone()),
652 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
653 );
654 for local_sst in &uncommitted_ssts {
655 local_stats.staging_sst_get_count += 1;
656 if let Some(iter) = get_from_sstable_info(
657 self.sstable_store.clone(),
658 local_sst,
659 full_key.to_ref(),
660 &read_options,
661 dist_key_hash,
662 local_stats,
663 )
664 .await?
665 {
666 debug_assert!(iter.is_valid());
667 let data_epoch = iter.key().epoch_with_gap;
668 return Ok(if data_epoch.pure_epoch() < min_epoch {
669 None
670 } else {
671 iter.value()
672 .into_user_value()
673 .map(|v| {
674 on_key_value_fn(
675 FullKey::new_with_gap_epoch(
676 table_id,
677 table_key.to_ref(),
678 data_epoch,
679 ),
680 v,
681 )
682 })
683 .transpose()?
684 });
685 }
686 }
687 let single_table_key_range = table_key.clone()..=table_key.clone();
688 assert!(committed_version.is_valid());
692 for level in committed_version.levels(table_id) {
693 if level.table_infos.is_empty() {
694 continue;
695 }
696
697 match level.level_type {
698 LevelType::Overlapping | LevelType::Unspecified => {
699 let sstable_infos = prune_overlapping_ssts(
700 &level.table_infos,
701 table_id,
702 &single_table_key_range,
703 );
704 for sstable_info in sstable_infos {
705 local_stats.overlapping_get_count += 1;
706 if let Some(iter) = get_from_sstable_info(
707 self.sstable_store.clone(),
708 sstable_info,
709 full_key.to_ref(),
710 &read_options,
711 dist_key_hash,
712 local_stats,
713 )
714 .await?
715 {
716 debug_assert!(iter.is_valid());
717 let data_epoch = iter.key().epoch_with_gap;
718 return Ok(if data_epoch.pure_epoch() < min_epoch {
719 None
720 } else {
721 iter.value()
722 .into_user_value()
723 .map(|v| {
724 on_key_value_fn(
725 FullKey::new_with_gap_epoch(
726 table_id,
727 table_key.to_ref(),
728 data_epoch,
729 ),
730 v,
731 )
732 })
733 .transpose()?
734 });
735 }
736 }
737 }
738 LevelType::Nonoverlapping => {
739 let mut table_info_idx =
740 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
741 if table_info_idx == 0 {
742 continue;
743 }
744 table_info_idx = table_info_idx.saturating_sub(1);
745 let ord = level.table_infos[table_info_idx]
746 .key_range
747 .compare_right_with_user_key(full_key.user_key.as_ref());
748 if ord == Ordering::Less {
750 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
751 continue;
752 }
753
754 local_stats.non_overlapping_get_count += 1;
755 if let Some(iter) = get_from_sstable_info(
756 self.sstable_store.clone(),
757 &level.table_infos[table_info_idx],
758 full_key.to_ref(),
759 &read_options,
760 dist_key_hash,
761 local_stats,
762 )
763 .await?
764 {
765 debug_assert!(iter.is_valid());
766 let data_epoch = iter.key().epoch_with_gap;
767 return Ok(if data_epoch.pure_epoch() < min_epoch {
768 None
769 } else {
770 iter.value()
771 .into_user_value()
772 .map(|v| {
773 on_key_value_fn(
774 FullKey::new_with_gap_epoch(
775 table_id,
776 table_key.to_ref(),
777 data_epoch,
778 ),
779 v,
780 )
781 })
782 .transpose()?
783 });
784 }
785 }
786 }
787 }
788 stats_guard.local_stats.found_key = false;
789 Ok(None)
790 }
791
792 pub async fn iter(
793 &self,
794 table_key_range: TableKeyRange,
795 epoch: u64,
796 table_id: TableId,
797 read_options: ReadOptions,
798 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
799 ) -> StorageResult<HummockStorageIterator> {
800 self.iter_with_memtable(
801 table_key_range,
802 epoch,
803 table_id,
804 read_options,
805 read_version_tuple,
806 None,
807 )
808 .await
809 }
810
811 pub async fn iter_with_memtable<'b>(
812 &self,
813 table_key_range: TableKeyRange,
814 epoch: u64,
815 table_id: TableId,
816 read_options: ReadOptions,
817 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
818 memtable_iter: Option<MemTableHummockIterator<'b>>,
819 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
820 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
821 let user_key_range = (
822 user_key_range_ref.0.map(|key| key.cloned()),
823 user_key_range_ref.1.map(|key| key.cloned()),
824 );
825 let mut factory = ForwardIteratorFactory::default();
826 let mut local_stats = StoreLocalStatistic::default();
827 let (imms, uncommitted_ssts, committed) = read_version_tuple;
828 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
829 self.iter_inner(
830 table_key_range,
831 epoch,
832 table_id,
833 read_options,
834 imms,
835 uncommitted_ssts,
836 &committed,
837 &mut local_stats,
838 &mut factory,
839 )
840 .await?;
841 let merge_iter = factory.build(memtable_iter);
842 let mut user_iter = UserIterator::new(
844 merge_iter,
845 user_key_range,
846 epoch,
847 min_epoch,
848 Some(committed),
849 );
850 user_iter.rewind().await?;
851 Ok(HummockStorageIteratorInner::new(
852 user_iter,
853 self.state_store_metrics.clone(),
854 table_id,
855 local_stats,
856 ))
857 }
858
859 pub async fn rev_iter<'b>(
860 &self,
861 table_key_range: TableKeyRange,
862 epoch: u64,
863 table_id: TableId,
864 read_options: ReadOptions,
865 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
866 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
867 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
868 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
869 let user_key_range = (
870 user_key_range_ref.0.map(|key| key.cloned()),
871 user_key_range_ref.1.map(|key| key.cloned()),
872 );
873 let mut factory = BackwardIteratorFactory::default();
874 let mut local_stats = StoreLocalStatistic::default();
875 let (imms, uncommitted_ssts, committed) = read_version_tuple;
876 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
877 self.iter_inner(
878 table_key_range,
879 epoch,
880 table_id,
881 read_options,
882 imms,
883 uncommitted_ssts,
884 &committed,
885 &mut local_stats,
886 &mut factory,
887 )
888 .await?;
889 let merge_iter = factory.build(memtable_iter);
890 let mut user_iter = BackwardUserIterator::new(
892 merge_iter,
893 user_key_range,
894 epoch,
895 min_epoch,
896 Some(committed),
897 );
898 user_iter.rewind().await?;
899 Ok(HummockStorageRevIteratorInner::new(
900 user_iter,
901 self.state_store_metrics.clone(),
902 table_id,
903 local_stats,
904 ))
905 }
906
907 async fn iter_inner<F: IteratorFactory>(
908 &self,
909 table_key_range: TableKeyRange,
910 epoch: u64,
911 table_id: TableId,
912 read_options: ReadOptions,
913 imms: Vec<ImmutableMemtable>,
914 uncommitted_ssts: Vec<SstableInfo>,
915 committed: &CommittedVersion,
916 local_stats: &mut StoreLocalStatistic,
917 factory: &mut F,
918 ) -> StorageResult<()> {
919 local_stats.staging_imm_iter_count = imms.len() as u64;
920 for imm in imms {
921 factory.add_batch_iter(imm);
922 }
923
924 let user_key_range = bound_table_key_range(table_id, &table_key_range);
928 let user_key_range_ref = (
929 user_key_range.0.as_ref().map(UserKey::as_ref),
930 user_key_range.1.as_ref().map(UserKey::as_ref),
931 );
932 let mut staging_sst_iter_count = 0;
933 let bloom_filter_prefix_hash = read_options
935 .prefix_hint
936 .as_ref()
937 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.table_id()));
938 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
939 if read_options.prefetch_options.prefetch {
940 sst_read_options.must_iterated_end_user_key =
941 Some(user_key_range.1.map(|key| key.cloned()));
942 sst_read_options.max_preload_retry_times = self.preload_retry_times;
943 }
944 let sst_read_options = Arc::new(sst_read_options);
945 for sstable_info in &uncommitted_ssts {
946 let table_holder = self
947 .sstable_store
948 .sstable(sstable_info, local_stats)
949 .await?;
950
951 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
952 && !hit_sstable_bloom_filter(
953 &table_holder,
954 &user_key_range_ref,
955 *prefix_hash,
956 local_stats,
957 )
958 {
959 continue;
960 }
961
962 staging_sst_iter_count += 1;
963 factory.add_staging_sst_iter(F::SstableIteratorType::create(
964 table_holder,
965 self.sstable_store.clone(),
966 sst_read_options.clone(),
967 sstable_info,
968 ));
969 }
970 local_stats.staging_sst_iter_count = staging_sst_iter_count;
971
972 let timer = Instant::now();
973
974 for level in committed.levels(table_id) {
975 if level.table_infos.is_empty() {
976 continue;
977 }
978
979 if level.level_type == LevelType::Nonoverlapping {
980 let mut table_infos = prune_nonoverlapping_ssts(
981 &level.table_infos,
982 user_key_range_ref,
983 table_id.table_id(),
984 )
985 .peekable();
986
987 if table_infos.peek().is_none() {
988 continue;
989 }
990 let sstable_infos = table_infos.cloned().collect_vec();
991 if sstable_infos.len() > 1 {
992 factory.add_concat_sst_iter(
993 sstable_infos,
994 self.sstable_store.clone(),
995 sst_read_options.clone(),
996 );
997 local_stats.non_overlapping_iter_count += 1;
998 } else {
999 let sstable = self
1000 .sstable_store
1001 .sstable(&sstable_infos[0], local_stats)
1002 .await?;
1003
1004 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1005 && !hit_sstable_bloom_filter(
1006 &sstable,
1007 &user_key_range_ref,
1008 *dist_hash,
1009 local_stats,
1010 )
1011 {
1012 continue;
1013 }
1014 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1020 sstable,
1021 self.sstable_store.clone(),
1022 sst_read_options.clone(),
1023 &sstable_infos[0],
1024 ));
1025 local_stats.non_overlapping_iter_count += 1;
1026 }
1027 } else {
1028 let table_infos =
1029 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1030 let fetch_meta_req = table_infos.rev().collect_vec();
1032 if fetch_meta_req.is_empty() {
1033 continue;
1034 }
1035 for sstable_info in fetch_meta_req {
1036 let sstable = self
1037 .sstable_store
1038 .sstable(sstable_info, local_stats)
1039 .await?;
1040 assert_eq!(sstable_info.object_id, sstable.id);
1041 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1042 && !hit_sstable_bloom_filter(
1043 &sstable,
1044 &user_key_range_ref,
1045 *dist_hash,
1046 local_stats,
1047 )
1048 {
1049 continue;
1050 }
1051 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1052 sstable,
1053 self.sstable_store.clone(),
1054 sst_read_options.clone(),
1055 sstable_info,
1056 ));
1057 local_stats.overlapping_iter_count += 1;
1058 }
1059 }
1060 }
1061 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1062 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1063 let table_id_string = table_id.to_string();
1064 tracing::warn!(
1065 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1066 table_id_string,
1067 epoch,
1068 fetch_meta_duration_sec,
1069 local_stats.cache_meta_block_miss
1070 );
1071 self.state_store_metrics
1072 .iter_slow_fetch_meta_cache_unhits
1073 .set(local_stats.cache_meta_block_miss as i64);
1074 }
1075 Ok(())
1076 }
1077
1078 pub async fn iter_log(
1079 &self,
1080 version: PinnedVersion,
1081 epoch_range: (u64, u64),
1082 key_range: TableKeyRange,
1083 options: ReadLogOptions,
1084 ) -> HummockResult<ChangeLogIterator> {
1085 let change_log = {
1086 let table_change_logs = version.table_change_log_read_lock();
1087 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1088 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1089 } else {
1090 Vec::new()
1091 }
1092 };
1093
1094 if let Some(max_epoch_change_log) = change_log.last() {
1095 let (_, max_epoch) = epoch_range;
1096 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1097 warn!(
1098 max_epoch,
1099 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1100 table_id = options.table_id.table_id,
1101 "max_epoch does not exist"
1102 );
1103 }
1104 }
1105 let read_options = Arc::new(SstableIteratorReadOptions {
1106 cache_policy: Default::default(),
1107 must_iterated_end_user_key: None,
1108 max_preload_retry_times: 0,
1109 prefetch_for_large_query: false,
1110 });
1111
1112 async fn make_iter(
1113 sstable_infos: impl Iterator<Item = &SstableInfo>,
1114 sstable_store: &SstableStoreRef,
1115 read_options: Arc<SstableIteratorReadOptions>,
1116 local_stat: &mut StoreLocalStatistic,
1117 ) -> HummockResult<MergeIterator<SstableIterator>> {
1118 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1119 let sstable_store = sstable_store.clone();
1120 let read_options = read_options.clone();
1121 async move {
1122 let mut local_stat = StoreLocalStatistic::default();
1123 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1124 Ok::<_, HummockError>((
1125 SstableIterator::new(
1126 table_holder,
1127 sstable_store,
1128 read_options,
1129 sstable_info,
1130 ),
1131 local_stat,
1132 ))
1133 }
1134 }))
1135 .await?;
1136 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1137 |(iter, stats)| {
1138 local_stat.add(&stats);
1139 iter
1140 },
1141 )))
1142 }
1143
1144 let mut local_stat = StoreLocalStatistic::default();
1145
1146 let new_value_iter = make_iter(
1147 change_log
1148 .iter()
1149 .flat_map(|log| log.new_value.iter())
1150 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1151 &self.sstable_store,
1152 read_options.clone(),
1153 &mut local_stat,
1154 )
1155 .await?;
1156 let old_value_iter = make_iter(
1157 change_log
1158 .iter()
1159 .flat_map(|log| log.old_value.iter())
1160 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1161 &self.sstable_store,
1162 read_options.clone(),
1163 &mut local_stat,
1164 )
1165 .await?;
1166 ChangeLogIterator::new(
1167 epoch_range,
1168 key_range,
1169 new_value_iter,
1170 old_value_iter,
1171 options.table_id,
1172 IterLocalMetricsGuard::new(
1173 self.state_store_metrics.clone(),
1174 options.table_id,
1175 local_stat,
1176 ),
1177 )
1178 .await
1179 }
1180
1181 pub async fn nearest<M: MeasureDistanceBuilder, O: Send>(
1182 &self,
1183 version: PinnedVersion,
1184 table_id: TableId,
1185 target: Vector,
1186 options: VectorNearestOptions,
1187 on_nearest_item_fn: impl OnNearestItemFn<O>,
1188 ) -> HummockResult<Vec<O>> {
1189 let Some(index) = version.vector_indexes.get(&table_id) else {
1190 return Ok(vec![]);
1191 };
1192 if target.dimension() != index.dimension {
1193 return Err(HummockError::other(format!(
1194 "target dimension {} not match index dimension {}",
1195 target.dimension(),
1196 index.dimension
1197 )));
1198 }
1199 match &index.inner {
1200 VectorIndexImpl::Flat(flat) => {
1201 let mut builder = NearestBuilder::<'_, O, M>::new(target.to_ref(), options.top_n);
1202 for vector_file in &flat.vector_store_info.vector_files {
1203 let meta = self.sstable_store.get_vector_file_meta(vector_file).await?;
1204 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1205 let block = self
1206 .sstable_store
1207 .get_vector_block(vector_file, i, block_meta)
1208 .await?;
1209 builder.add(&**block, &on_nearest_item_fn);
1210 }
1211 }
1212 Ok(builder.finish())
1213 }
1214 }
1215 }
1216}