1use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54 filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55 search_sst_idx,
56};
57use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
58use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
59use crate::hummock::{
60 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
61 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
62 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
63 hit_sstable_bloom_filter,
64};
65use crate::mem_table::{
66 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
67};
68use crate::monitor::{
69 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
70};
71use crate::store::{
72 OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
73};
74use crate::vector::hnsw::nearest;
75use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
76
77pub type CommittedVersion = PinnedVersion;
78
79#[derive(Clone, Debug, PartialEq)]
85pub struct StagingSstableInfo {
86 sstable_infos: Vec<LocalSstableInfo>,
88 old_value_sstable_infos: Vec<LocalSstableInfo>,
89 epochs: Vec<HummockEpoch>,
92 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
94 imm_size: usize,
95}
96
97impl StagingSstableInfo {
98 pub fn new(
99 sstable_infos: Vec<LocalSstableInfo>,
100 old_value_sstable_infos: Vec<LocalSstableInfo>,
101 epochs: Vec<HummockEpoch>,
102 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
103 imm_size: usize,
104 ) -> Self {
105 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
107 Self {
108 sstable_infos,
109 old_value_sstable_infos,
110 epochs,
111 imm_ids,
112 imm_size,
113 }
114 }
115
116 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
117 &self.sstable_infos
118 }
119
120 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
121 &self.old_value_sstable_infos
122 }
123
124 pub fn imm_size(&self) -> usize {
125 self.imm_size
126 }
127
128 pub fn epochs(&self) -> &Vec<HummockEpoch> {
129 &self.epochs
130 }
131
132 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
133 &self.imm_ids
134 }
135}
136
137pub enum VersionUpdate {
138 Sst(Arc<StagingSstableInfo>),
139 CommittedSnapshot(CommittedVersion),
140 NewTableWatermark {
141 direction: WatermarkDirection,
142 epoch: HummockEpoch,
143 vnode_watermarks: Vec<VnodeWatermark>,
144 watermark_type: WatermarkSerdeType,
145 },
146}
147
148#[derive(Clone)]
149pub struct StagingVersion {
150 pending_imm_size: usize,
151 pub pending_imms: Vec<ImmutableMemtable>,
158 pub uploading_imms: VecDeque<ImmutableMemtable>,
164
165 pub sst: VecDeque<Arc<StagingSstableInfo>>,
167}
168
169impl StagingVersion {
170 pub fn prune_overlap<'a>(
173 &'a self,
174 max_epoch_inclusive: HummockEpoch,
175 table_id: TableId,
176 table_key_range: &'a TableKeyRange,
177 ) -> (
178 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
179 impl Iterator<Item = &'a SstableInfo> + 'a,
180 ) {
181 let (left, right) = table_key_range;
182 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
183 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
184 let overlapped_imms = self
185 .pending_imms
186 .iter()
187 .rev() .chain(self.uploading_imms.iter())
189 .filter(move |imm| {
190 imm.min_epoch() <= max_epoch_inclusive
192 && imm.table_id == table_id
193 && range_overlap(
194 &(left, right),
195 &imm.start_table_key(),
196 Bound::Included(&imm.end_table_key()),
197 )
198 });
199
200 let overlapped_ssts = self
202 .sst
203 .iter()
204 .filter(move |staging_sst| {
205 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
206 sst_max_epoch <= max_epoch_inclusive
207 })
208 .flat_map(move |staging_sst| {
209 staging_sst
212 .sstable_infos
213 .iter()
214 .map(|sstable| &sstable.sst_info)
215 .filter(move |sstable: &&SstableInfo| {
216 filter_single_sst(sstable, table_id, table_key_range)
217 })
218 });
219 (overlapped_imms, overlapped_ssts)
220 }
221
222 pub fn is_empty(&self) -> bool {
223 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
224 }
225}
226
227#[derive(Clone)]
228pub struct HummockReadVersion {
230 table_id: TableId,
231 instance_id: LocalInstanceId,
232
233 staging: StagingVersion,
235
236 committed: CommittedVersion,
238
239 is_replicated: bool,
244
245 table_watermarks: Option<PkPrefixTableWatermarksIndex>,
246
247 vnodes: Arc<Bitmap>,
250}
251
252impl HummockReadVersion {
253 pub fn new_with_replication_option(
254 table_id: TableId,
255 instance_id: LocalInstanceId,
256 committed_version: CommittedVersion,
257 is_replicated: bool,
258 vnodes: Arc<Bitmap>,
259 ) -> Self {
260 assert!(committed_version.is_valid());
264 Self {
265 table_id,
266 instance_id,
267 table_watermarks: {
268 match committed_version.table_watermarks.get(&table_id) {
269 Some(table_watermarks) => match table_watermarks.watermark_type {
270 WatermarkSerdeType::PkPrefix => {
271 Some(PkPrefixTableWatermarksIndex::new_committed(
272 table_watermarks.clone(),
273 committed_version
274 .state_table_info
275 .info()
276 .get(&table_id)
277 .expect("should exist")
278 .committed_epoch,
279 ))
280 }
281
282 WatermarkSerdeType::NonPkPrefix => None, },
284 None => None,
285 }
286 },
287 staging: StagingVersion {
288 pending_imm_size: 0,
289 pending_imms: Vec::default(),
290 uploading_imms: VecDeque::default(),
291 sst: VecDeque::default(),
292 },
293
294 committed: committed_version,
295
296 is_replicated,
297 vnodes,
298 }
299 }
300
301 pub fn new(
302 table_id: TableId,
303 instance_id: LocalInstanceId,
304 committed_version: CommittedVersion,
305 vnodes: Arc<Bitmap>,
306 ) -> Self {
307 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
308 }
309
310 pub fn table_id(&self) -> TableId {
311 self.table_id
312 }
313
314 pub fn add_imm(&mut self, imm: ImmutableMemtable) {
315 if let Some(item) = self
316 .staging
317 .pending_imms
318 .last()
319 .or_else(|| self.staging.uploading_imms.front())
320 {
321 debug_assert!(item.batch_id() < imm.batch_id());
323 }
324
325 self.staging.pending_imm_size += imm.size();
326 self.staging.pending_imms.push(imm);
327 }
328
329 pub fn pending_imm_size(&self) -> usize {
330 self.staging.pending_imm_size
331 }
332
333 pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
334 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
335 for imm in &pending_imms {
336 self.staging.uploading_imms.push_front(imm.clone());
337 }
338 self.staging.pending_imm_size = 0;
339 pending_imms
340 }
341
342 pub fn update(&mut self, info: VersionUpdate) {
352 match info {
353 VersionUpdate::Sst(staging_sst_ref) => {
354 {
355 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
356 warn!(
357 instance_id = self.instance_id,
358 "no related imm in sst input"
359 );
360 return;
361 };
362
363 for imm_id in imms.iter().rev() {
365 let check_err = match self.staging.uploading_imms.pop_back() {
366 None => Some("empty".to_owned()),
367 Some(prev_imm_id) => {
368 if prev_imm_id.batch_id() == *imm_id {
369 None
370 } else {
371 Some(format!(
372 "miss match id {} {}",
373 prev_imm_id.batch_id(),
374 *imm_id
375 ))
376 }
377 }
378 };
379 assert!(
380 check_err.is_none(),
381 "should be valid staging_sst.size {},
382 staging_sst.imm_ids {:?},
383 staging_sst.epochs {:?},
384 local_pending_imm_ids {:?},
385 local_uploading_imm_ids {:?},
386 instance_id {}
387 check_err {:?}",
388 staging_sst_ref.imm_size,
389 staging_sst_ref.imm_ids,
390 staging_sst_ref.epochs,
391 self.staging
392 .pending_imms
393 .iter()
394 .map(|imm| imm.batch_id())
395 .collect_vec(),
396 self.staging
397 .uploading_imms
398 .iter()
399 .map(|imm| imm.batch_id())
400 .collect_vec(),
401 self.instance_id,
402 check_err
403 );
404 }
405
406 self.staging.sst.push_front(staging_sst_ref);
407 }
408 }
409
410 VersionUpdate::CommittedSnapshot(committed_version) => {
411 if let Some(info) = committed_version
412 .state_table_info
413 .info()
414 .get(&self.table_id)
415 {
416 let committed_epoch = info.committed_epoch;
417 if self.is_replicated {
418 self.staging
419 .uploading_imms
420 .retain(|imm| imm.min_epoch() > committed_epoch);
421 self.staging
422 .pending_imms
423 .retain(|imm| imm.min_epoch() > committed_epoch);
424 } else {
425 self.staging
426 .pending_imms
427 .iter()
428 .chain(self.staging.uploading_imms.iter())
429 .for_each(|imm| {
430 assert!(
431 imm.min_epoch() > committed_epoch,
432 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
433 imm.table_id,
434 imm.min_epoch(),
435 committed_epoch
436 )
437 });
438 }
439
440 self.staging.sst.retain(|sst| {
441 sst.epochs.first().expect("epochs not empty") > &committed_epoch
442 });
443
444 assert!(self.staging.sst.iter().all(|sst| {
446 sst.epochs.last().expect("epochs not empty") > &committed_epoch
447 }));
448
449 if let Some(committed_watermarks) =
450 committed_version.table_watermarks.get(&self.table_id)
451 && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
452 {
453 if let Some(watermark_index) = &mut self.table_watermarks {
454 watermark_index.apply_committed_watermarks(
455 committed_watermarks.clone(),
456 committed_epoch,
457 );
458 } else {
459 self.table_watermarks =
460 Some(PkPrefixTableWatermarksIndex::new_committed(
461 committed_watermarks.clone(),
462 committed_epoch,
463 ));
464 }
465 }
466 }
467
468 self.committed = committed_version;
469 }
470 VersionUpdate::NewTableWatermark {
471 direction,
472 epoch,
473 vnode_watermarks,
474 watermark_type,
475 } => {
476 assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
477 if let Some(watermark_index) = &mut self.table_watermarks {
478 watermark_index.add_epoch_watermark(
479 epoch,
480 Arc::from(vnode_watermarks),
481 direction,
482 );
483 } else {
484 self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
485 direction,
486 epoch,
487 vnode_watermarks,
488 self.committed.table_committed_epoch(self.table_id),
489 ));
490 }
491 }
492 }
493 }
494
495 pub fn staging(&self) -> &StagingVersion {
496 &self.staging
497 }
498
499 pub fn committed(&self) -> &CommittedVersion {
500 &self.committed
501 }
502
503 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
508 if let Some(watermark_index) = &self.table_watermarks {
509 watermark_index.filter_regress_watermarks(watermarks)
510 }
511 }
512
513 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
514 self.table_watermarks
515 .as_ref()
516 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
517 }
518
519 pub fn is_replicated(&self) -> bool {
520 self.is_replicated
521 }
522
523 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
524 std::mem::replace(&mut self.vnodes, vnodes)
525 }
526
527 pub fn contains(&self, vnode: VirtualNode) -> bool {
528 self.vnodes.is_set(vnode.to_index())
529 }
530
531 pub fn vnodes(&self) -> Arc<Bitmap> {
532 self.vnodes.clone()
533 }
534}
535
536pub fn read_filter_for_version(
537 epoch: HummockEpoch,
538 table_id: TableId,
539 mut table_key_range: TableKeyRange,
540 read_version: &RwLock<HummockReadVersion>,
541) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
542 let read_version_guard = read_version.read();
543
544 let committed_version = read_version_guard.committed().clone();
545
546 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
547 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
548 }
549
550 let (imm_iter, sst_iter) =
551 read_version_guard
552 .staging()
553 .prune_overlap(epoch, table_id, &table_key_range);
554
555 let imms = imm_iter.cloned().collect();
556 let ssts = sst_iter.cloned().collect();
557
558 Ok((table_key_range, (imms, ssts, committed_version)))
559}
560
561#[derive(Clone)]
562pub struct HummockVersionReader {
563 sstable_store: SstableStoreRef,
564
565 state_store_metrics: Arc<HummockStateStoreMetrics>,
567 preload_retry_times: usize,
568}
569
570impl HummockVersionReader {
573 pub fn new(
574 sstable_store: SstableStoreRef,
575 state_store_metrics: Arc<HummockStateStoreMetrics>,
576 preload_retry_times: usize,
577 ) -> Self {
578 Self {
579 sstable_store,
580 state_store_metrics,
581 preload_retry_times,
582 }
583 }
584
585 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
586 &self.state_store_metrics
587 }
588}
589
590const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
591
592impl HummockVersionReader {
593 pub async fn get<'a, O>(
594 &'a self,
595 table_key: TableKey<Bytes>,
596 epoch: u64,
597 table_id: TableId,
598 read_options: ReadOptions,
599 read_version_tuple: ReadVersionTuple,
600 on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
601 ) -> StorageResult<Option<O>> {
602 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
603
604 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
605 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
606 let local_stats = &mut stats_guard.local_stats;
607 local_stats.found_key = true;
608
609 for imm in &imms {
611 if imm.max_epoch() < min_epoch {
613 continue;
614 }
615
616 local_stats.staging_imm_get_count += 1;
617
618 if let Some((data, data_epoch)) = get_from_batch(
619 imm,
620 TableKey(table_key.as_ref()),
621 epoch,
622 &read_options,
623 local_stats,
624 ) {
625 return Ok(if data_epoch.pure_epoch() < min_epoch {
626 None
627 } else {
628 data.into_user_value()
629 .map(|v| {
630 on_key_value_fn(
631 FullKey::new_with_gap_epoch(
632 table_id,
633 table_key.to_ref(),
634 data_epoch,
635 ),
636 v.as_ref(),
637 )
638 })
639 .transpose()?
640 });
641 }
642 }
643
644 let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
646 Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
647 });
648
649 let full_key = FullKey::new_with_gap_epoch(
652 table_id,
653 TableKey(table_key.clone()),
654 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
655 );
656 let single_table_key_range = table_key.clone()..=table_key.clone();
657
658 let pruned_uncommitted_ssts =
660 prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
661 for local_sst in pruned_uncommitted_ssts {
662 local_stats.staging_sst_get_count += 1;
663 if let Some(iter) = get_from_sstable_info(
664 self.sstable_store.clone(),
665 local_sst,
666 full_key.to_ref(),
667 &read_options,
668 dist_key_hash,
669 local_stats,
670 )
671 .await?
672 {
673 debug_assert!(iter.is_valid());
674 let data_epoch = iter.key().epoch_with_gap;
675 return Ok(if data_epoch.pure_epoch() < min_epoch {
676 None
677 } else {
678 iter.value()
679 .into_user_value()
680 .map(|v| {
681 on_key_value_fn(
682 FullKey::new_with_gap_epoch(
683 table_id,
684 table_key.to_ref(),
685 data_epoch,
686 ),
687 v,
688 )
689 })
690 .transpose()?
691 });
692 }
693 }
694 assert!(committed_version.is_valid());
698 for level in committed_version.levels(table_id) {
699 if level.table_infos.is_empty() {
700 continue;
701 }
702
703 match level.level_type {
704 LevelType::Overlapping | LevelType::Unspecified => {
705 let sstable_infos = prune_overlapping_ssts(
706 &level.table_infos,
707 table_id,
708 &single_table_key_range,
709 );
710 for sstable_info in sstable_infos {
711 local_stats.overlapping_get_count += 1;
712 if let Some(iter) = get_from_sstable_info(
713 self.sstable_store.clone(),
714 sstable_info,
715 full_key.to_ref(),
716 &read_options,
717 dist_key_hash,
718 local_stats,
719 )
720 .await?
721 {
722 debug_assert!(iter.is_valid());
723 let data_epoch = iter.key().epoch_with_gap;
724 return Ok(if data_epoch.pure_epoch() < min_epoch {
725 None
726 } else {
727 iter.value()
728 .into_user_value()
729 .map(|v| {
730 on_key_value_fn(
731 FullKey::new_with_gap_epoch(
732 table_id,
733 table_key.to_ref(),
734 data_epoch,
735 ),
736 v,
737 )
738 })
739 .transpose()?
740 });
741 }
742 }
743 }
744 LevelType::Nonoverlapping => {
745 let mut table_info_idx =
746 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
747 if table_info_idx == 0 {
748 continue;
749 }
750 table_info_idx = table_info_idx.saturating_sub(1);
751 let ord = level.table_infos[table_info_idx]
752 .key_range
753 .compare_right_with_user_key(full_key.user_key.as_ref());
754 if ord == Ordering::Less {
756 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
757 continue;
758 }
759
760 local_stats.non_overlapping_get_count += 1;
761 if let Some(iter) = get_from_sstable_info(
762 self.sstable_store.clone(),
763 &level.table_infos[table_info_idx],
764 full_key.to_ref(),
765 &read_options,
766 dist_key_hash,
767 local_stats,
768 )
769 .await?
770 {
771 debug_assert!(iter.is_valid());
772 let data_epoch = iter.key().epoch_with_gap;
773 return Ok(if data_epoch.pure_epoch() < min_epoch {
774 None
775 } else {
776 iter.value()
777 .into_user_value()
778 .map(|v| {
779 on_key_value_fn(
780 FullKey::new_with_gap_epoch(
781 table_id,
782 table_key.to_ref(),
783 data_epoch,
784 ),
785 v,
786 )
787 })
788 .transpose()?
789 });
790 }
791 }
792 }
793 }
794 stats_guard.local_stats.found_key = false;
795 Ok(None)
796 }
797
798 pub async fn iter(
799 &self,
800 table_key_range: TableKeyRange,
801 epoch: u64,
802 table_id: TableId,
803 read_options: ReadOptions,
804 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
805 ) -> StorageResult<HummockStorageIterator> {
806 self.iter_with_memtable(
807 table_key_range,
808 epoch,
809 table_id,
810 read_options,
811 read_version_tuple,
812 None,
813 )
814 .await
815 }
816
817 pub async fn iter_with_memtable<'b>(
818 &self,
819 table_key_range: TableKeyRange,
820 epoch: u64,
821 table_id: TableId,
822 read_options: ReadOptions,
823 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
824 memtable_iter: Option<MemTableHummockIterator<'b>>,
825 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
826 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
827 let user_key_range = (
828 user_key_range_ref.0.map(|key| key.cloned()),
829 user_key_range_ref.1.map(|key| key.cloned()),
830 );
831 let mut factory = ForwardIteratorFactory::default();
832 let mut local_stats = StoreLocalStatistic::default();
833 let (imms, uncommitted_ssts, committed) = read_version_tuple;
834 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
835 self.iter_inner(
836 table_key_range,
837 epoch,
838 table_id,
839 read_options,
840 imms,
841 uncommitted_ssts,
842 &committed,
843 &mut local_stats,
844 &mut factory,
845 )
846 .await?;
847 let merge_iter = factory.build(memtable_iter);
848 let mut user_iter = UserIterator::new(
850 merge_iter,
851 user_key_range,
852 epoch,
853 min_epoch,
854 Some(committed),
855 );
856 user_iter.rewind().await?;
857 Ok(HummockStorageIteratorInner::new(
858 user_iter,
859 self.state_store_metrics.clone(),
860 table_id,
861 local_stats,
862 ))
863 }
864
865 pub async fn rev_iter<'b>(
866 &self,
867 table_key_range: TableKeyRange,
868 epoch: u64,
869 table_id: TableId,
870 read_options: ReadOptions,
871 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
872 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
873 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
874 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
875 let user_key_range = (
876 user_key_range_ref.0.map(|key| key.cloned()),
877 user_key_range_ref.1.map(|key| key.cloned()),
878 );
879 let mut factory = BackwardIteratorFactory::default();
880 let mut local_stats = StoreLocalStatistic::default();
881 let (imms, uncommitted_ssts, committed) = read_version_tuple;
882 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
883 self.iter_inner(
884 table_key_range,
885 epoch,
886 table_id,
887 read_options,
888 imms,
889 uncommitted_ssts,
890 &committed,
891 &mut local_stats,
892 &mut factory,
893 )
894 .await?;
895 let merge_iter = factory.build(memtable_iter);
896 let mut user_iter = BackwardUserIterator::new(
898 merge_iter,
899 user_key_range,
900 epoch,
901 min_epoch,
902 Some(committed),
903 );
904 user_iter.rewind().await?;
905 Ok(HummockStorageRevIteratorInner::new(
906 user_iter,
907 self.state_store_metrics.clone(),
908 table_id,
909 local_stats,
910 ))
911 }
912
913 async fn iter_inner<F: IteratorFactory>(
914 &self,
915 table_key_range: TableKeyRange,
916 epoch: u64,
917 table_id: TableId,
918 read_options: ReadOptions,
919 imms: Vec<ImmutableMemtable>,
920 uncommitted_ssts: Vec<SstableInfo>,
921 committed: &CommittedVersion,
922 local_stats: &mut StoreLocalStatistic,
923 factory: &mut F,
924 ) -> StorageResult<()> {
925 {
926 fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
927 match bound {
928 Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
929 Bound::Unbounded => None,
930 }
931 }
932 let (left, right) = &table_key_range;
933 if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
934 && right < left
935 {
936 if cfg!(debug_assertions) {
937 panic!("invalid iter key range: {table_id} {left:?} {right:?}")
938 } else {
939 return Err(HummockError::other(format!(
940 "invalid iter key range: {table_id} {left:?} {right:?}"
941 ))
942 .into());
943 }
944 }
945 }
946
947 local_stats.staging_imm_iter_count = imms.len() as u64;
948 for imm in imms {
949 factory.add_batch_iter(imm);
950 }
951
952 let user_key_range = bound_table_key_range(table_id, &table_key_range);
956 let user_key_range_ref = (
957 user_key_range.0.as_ref().map(UserKey::as_ref),
958 user_key_range.1.as_ref().map(UserKey::as_ref),
959 );
960 let mut staging_sst_iter_count = 0;
961 let bloom_filter_prefix_hash = read_options
963 .prefix_hint
964 .as_ref()
965 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
966 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
967 if read_options.prefetch_options.prefetch {
968 sst_read_options.must_iterated_end_user_key =
969 Some(user_key_range.1.map(|key| key.cloned()));
970 sst_read_options.max_preload_retry_times = self.preload_retry_times;
971 }
972 let sst_read_options = Arc::new(sst_read_options);
973 for sstable_info in &uncommitted_ssts {
974 let table_holder = self
975 .sstable_store
976 .sstable(sstable_info, local_stats)
977 .await?;
978
979 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
980 && !hit_sstable_bloom_filter(
981 &table_holder,
982 &user_key_range_ref,
983 *prefix_hash,
984 local_stats,
985 )
986 {
987 continue;
988 }
989
990 staging_sst_iter_count += 1;
991 factory.add_staging_sst_iter(F::SstableIteratorType::create(
992 table_holder,
993 self.sstable_store.clone(),
994 sst_read_options.clone(),
995 sstable_info,
996 ));
997 }
998 local_stats.staging_sst_iter_count = staging_sst_iter_count;
999
1000 let timer = Instant::now();
1001
1002 for level in committed.levels(table_id) {
1003 if level.table_infos.is_empty() {
1004 continue;
1005 }
1006
1007 if level.level_type == LevelType::Nonoverlapping {
1008 let mut table_infos =
1009 prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1010 .peekable();
1011
1012 if table_infos.peek().is_none() {
1013 continue;
1014 }
1015 let sstable_infos = table_infos.cloned().collect_vec();
1016 if sstable_infos.len() > 1 {
1017 factory.add_concat_sst_iter(
1018 sstable_infos,
1019 self.sstable_store.clone(),
1020 sst_read_options.clone(),
1021 );
1022 local_stats.non_overlapping_iter_count += 1;
1023 } else {
1024 let sstable = self
1025 .sstable_store
1026 .sstable(&sstable_infos[0], local_stats)
1027 .await?;
1028
1029 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1030 && !hit_sstable_bloom_filter(
1031 &sstable,
1032 &user_key_range_ref,
1033 *dist_hash,
1034 local_stats,
1035 )
1036 {
1037 continue;
1038 }
1039 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1045 sstable,
1046 self.sstable_store.clone(),
1047 sst_read_options.clone(),
1048 &sstable_infos[0],
1049 ));
1050 local_stats.non_overlapping_iter_count += 1;
1051 }
1052 } else {
1053 let table_infos =
1054 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1055 let fetch_meta_req = table_infos.rev().collect_vec();
1057 if fetch_meta_req.is_empty() {
1058 continue;
1059 }
1060 for sstable_info in fetch_meta_req {
1061 let sstable = self
1062 .sstable_store
1063 .sstable(sstable_info, local_stats)
1064 .await?;
1065 assert_eq!(sstable_info.object_id, sstable.id);
1066 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1067 && !hit_sstable_bloom_filter(
1068 &sstable,
1069 &user_key_range_ref,
1070 *dist_hash,
1071 local_stats,
1072 )
1073 {
1074 continue;
1075 }
1076 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1077 sstable,
1078 self.sstable_store.clone(),
1079 sst_read_options.clone(),
1080 sstable_info,
1081 ));
1082 local_stats.overlapping_iter_count += 1;
1083 }
1084 }
1085 }
1086 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1087 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1088 let table_id_string = table_id.to_string();
1089 tracing::warn!(
1090 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1091 table_id_string,
1092 epoch,
1093 fetch_meta_duration_sec,
1094 local_stats.cache_meta_block_miss
1095 );
1096 self.state_store_metrics
1097 .iter_slow_fetch_meta_cache_unhits
1098 .set(local_stats.cache_meta_block_miss as i64);
1099 }
1100 Ok(())
1101 }
1102
1103 pub async fn iter_log(
1104 &self,
1105 version: PinnedVersion,
1106 epoch_range: (u64, u64),
1107 key_range: TableKeyRange,
1108 options: ReadLogOptions,
1109 ) -> HummockResult<ChangeLogIterator> {
1110 let change_log = {
1111 let table_change_logs = version.table_change_log_read_lock();
1112 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1113 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1114 } else {
1115 Vec::new()
1116 }
1117 };
1118
1119 if let Some(max_epoch_change_log) = change_log.last() {
1120 let (_, max_epoch) = epoch_range;
1121 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1122 warn!(
1123 max_epoch,
1124 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1125 table_id = %options.table_id,
1126 "max_epoch does not exist"
1127 );
1128 }
1129 }
1130 let read_options = Arc::new(SstableIteratorReadOptions {
1131 cache_policy: Default::default(),
1132 must_iterated_end_user_key: None,
1133 max_preload_retry_times: 0,
1134 prefetch_for_large_query: false,
1135 });
1136
1137 async fn make_iter(
1138 sstable_infos: impl Iterator<Item = &SstableInfo>,
1139 sstable_store: &SstableStoreRef,
1140 read_options: Arc<SstableIteratorReadOptions>,
1141 local_stat: &mut StoreLocalStatistic,
1142 ) -> HummockResult<MergeIterator<SstableIterator>> {
1143 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1144 let sstable_store = sstable_store.clone();
1145 let read_options = read_options.clone();
1146 async move {
1147 let mut local_stat = StoreLocalStatistic::default();
1148 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1149 Ok::<_, HummockError>((
1150 SstableIterator::new(
1151 table_holder,
1152 sstable_store,
1153 read_options,
1154 sstable_info,
1155 ),
1156 local_stat,
1157 ))
1158 }
1159 }))
1160 .await?;
1161 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1162 |(iter, stats)| {
1163 local_stat.add(&stats);
1164 iter
1165 },
1166 )))
1167 }
1168
1169 let mut local_stat = StoreLocalStatistic::default();
1170
1171 let new_value_iter = make_iter(
1172 change_log
1173 .iter()
1174 .flat_map(|log| log.new_value.iter())
1175 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1176 &self.sstable_store,
1177 read_options.clone(),
1178 &mut local_stat,
1179 )
1180 .await?;
1181 let old_value_iter = make_iter(
1182 change_log
1183 .iter()
1184 .flat_map(|log| log.old_value.iter())
1185 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1186 &self.sstable_store,
1187 read_options.clone(),
1188 &mut local_stat,
1189 )
1190 .await?;
1191 ChangeLogIterator::new(
1192 epoch_range,
1193 key_range,
1194 new_value_iter,
1195 old_value_iter,
1196 options.table_id,
1197 IterLocalMetricsGuard::new(
1198 self.state_store_metrics.clone(),
1199 options.table_id,
1200 local_stat,
1201 ),
1202 )
1203 .await
1204 }
1205
1206 pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1207 &'a self,
1208 version: PinnedVersion,
1209 table_id: TableId,
1210 target: VectorRef<'a>,
1211 options: VectorNearestOptions,
1212 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1213 ) -> HummockResult<Vec<O>> {
1214 let Some(index) = version.vector_indexes.get(&table_id) else {
1215 return Ok(vec![]);
1216 };
1217 if target.dimension() != index.dimension {
1218 return Err(HummockError::other(format!(
1219 "target dimension {} not match index dimension {}",
1220 target.dimension(),
1221 index.dimension
1222 )));
1223 }
1224 match &index.inner {
1225 VectorIndexImpl::Flat(flat) => {
1226 let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1227 let mut cache_stat = VectorStoreCacheStats::default();
1228 for vector_file in &flat.vector_store_info.vector_files {
1229 let meta = self
1230 .sstable_store
1231 .get_vector_file_meta(vector_file, &mut cache_stat)
1232 .await?;
1233 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1234 let block = self
1235 .sstable_store
1236 .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1237 .await?;
1238 builder.add(&**block, &on_nearest_item_fn);
1239 }
1240 }
1241 cache_stat.report(table_id, "flat", self.stats());
1242 Ok(builder.finish())
1243 }
1244 VectorIndexImpl::HnswFlat(hnsw_flat) => {
1245 let Some(graph_file) = &hnsw_flat.graph_file else {
1246 return Ok(vec![]);
1247 };
1248
1249 let mut ctx = FileVectorStoreCtx::default();
1250
1251 let graph = self
1252 .sstable_store
1253 .get_hnsw_graph(graph_file, &mut ctx.stats)
1254 .await?;
1255
1256 let vector_store =
1257 FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1258 let (items, stats) = nearest::<O, M, _>(
1259 &vector_store,
1260 &mut ctx,
1261 &*graph,
1262 target,
1263 on_nearest_item_fn,
1264 options.hnsw_ef_search,
1265 options.top_n,
1266 )
1267 .await?;
1268 ctx.stats.report(table_id, "hnsw_read", self.stats());
1269 report_hnsw_stat(
1270 self.stats(),
1271 table_id,
1272 "hnsw_read",
1273 options.top_n,
1274 options.hnsw_ef_search,
1275 [stats],
1276 );
1277 Ok(items)
1278 }
1279 }
1280 }
1281}