1use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54 filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55 search_sst_idx,
56};
57use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
58use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
59use crate::hummock::{
60 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
61 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
62 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
63 hit_sstable_bloom_filter,
64};
65use crate::mem_table::{
66 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
67};
68use crate::monitor::{
69 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
70};
71use crate::store::{
72 OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
73};
74use crate::vector::hnsw::nearest;
75use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
76
77pub type CommittedVersion = PinnedVersion;
78
79#[derive(Clone, Debug, PartialEq)]
85pub struct StagingSstableInfo {
86 sstable_infos: Vec<LocalSstableInfo>,
88 old_value_sstable_infos: Vec<LocalSstableInfo>,
89 epochs: Vec<HummockEpoch>,
92 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
94 imm_size: usize,
95}
96
97impl StagingSstableInfo {
98 pub fn new(
99 sstable_infos: Vec<LocalSstableInfo>,
100 old_value_sstable_infos: Vec<LocalSstableInfo>,
101 epochs: Vec<HummockEpoch>,
102 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
103 imm_size: usize,
104 ) -> Self {
105 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
107 Self {
108 sstable_infos,
109 old_value_sstable_infos,
110 epochs,
111 imm_ids,
112 imm_size,
113 }
114 }
115
116 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
117 &self.sstable_infos
118 }
119
120 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
121 &self.old_value_sstable_infos
122 }
123
124 pub fn imm_size(&self) -> usize {
125 self.imm_size
126 }
127
128 pub fn epochs(&self) -> &Vec<HummockEpoch> {
129 &self.epochs
130 }
131
132 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
133 &self.imm_ids
134 }
135}
136
137pub enum VersionUpdate {
138 Sst(Arc<StagingSstableInfo>),
139 CommittedSnapshot(CommittedVersion),
140 NewTableWatermark {
141 direction: WatermarkDirection,
142 epoch: HummockEpoch,
143 vnode_watermarks: Vec<VnodeWatermark>,
144 watermark_type: WatermarkSerdeType,
145 },
146}
147
148#[derive(Clone)]
149pub struct StagingVersion {
150 pending_imm_size: usize,
151 pub pending_imms: Vec<ImmutableMemtable>,
158 pub uploading_imms: VecDeque<ImmutableMemtable>,
164
165 pub sst: VecDeque<Arc<StagingSstableInfo>>,
167}
168
169impl StagingVersion {
170 pub fn prune_overlap<'a>(
173 &'a self,
174 max_epoch_inclusive: HummockEpoch,
175 table_id: TableId,
176 table_key_range: &'a TableKeyRange,
177 ) -> (
178 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
179 impl Iterator<Item = &'a SstableInfo> + 'a,
180 ) {
181 let (left, right) = table_key_range;
182 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
183 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
184 let overlapped_imms = self
185 .pending_imms
186 .iter()
187 .rev() .chain(self.uploading_imms.iter())
189 .filter(move |imm| {
190 imm.min_epoch() <= max_epoch_inclusive
192 && imm.table_id == table_id
193 && range_overlap(
194 &(left, right),
195 &imm.start_table_key(),
196 Bound::Included(&imm.end_table_key()),
197 )
198 });
199
200 let overlapped_ssts = self
202 .sst
203 .iter()
204 .filter(move |staging_sst| {
205 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
206 sst_max_epoch <= max_epoch_inclusive
207 })
208 .flat_map(move |staging_sst| {
209 staging_sst
212 .sstable_infos
213 .iter()
214 .map(|sstable| &sstable.sst_info)
215 .filter(move |sstable: &&SstableInfo| {
216 filter_single_sst(sstable, table_id, table_key_range)
217 })
218 });
219 (overlapped_imms, overlapped_ssts)
220 }
221
222 pub fn is_empty(&self) -> bool {
223 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
224 }
225}
226
227#[derive(Clone)]
228pub struct HummockReadVersion {
230 table_id: TableId,
231 instance_id: LocalInstanceId,
232
233 staging: StagingVersion,
235
236 committed: CommittedVersion,
238
239 is_replicated: bool,
244
245 table_watermarks: Option<PkPrefixTableWatermarksIndex>,
246
247 vnodes: Arc<Bitmap>,
250}
251
252impl HummockReadVersion {
253 pub fn new_with_replication_option(
254 table_id: TableId,
255 instance_id: LocalInstanceId,
256 committed_version: CommittedVersion,
257 is_replicated: bool,
258 vnodes: Arc<Bitmap>,
259 ) -> Self {
260 assert!(committed_version.is_valid());
264 Self {
265 table_id,
266 instance_id,
267 table_watermarks: {
268 match committed_version.table_watermarks.get(&table_id) {
269 Some(table_watermarks) => match table_watermarks.watermark_type {
270 WatermarkSerdeType::PkPrefix => {
271 Some(PkPrefixTableWatermarksIndex::new_committed(
272 table_watermarks.clone(),
273 committed_version
274 .state_table_info
275 .info()
276 .get(&table_id)
277 .expect("should exist")
278 .committed_epoch,
279 ))
280 }
281
282 WatermarkSerdeType::NonPkPrefix => None, },
284 None => None,
285 }
286 },
287 staging: StagingVersion {
288 pending_imm_size: 0,
289 pending_imms: Vec::default(),
290 uploading_imms: VecDeque::default(),
291 sst: VecDeque::default(),
292 },
293
294 committed: committed_version,
295
296 is_replicated,
297 vnodes,
298 }
299 }
300
301 pub fn new(
302 table_id: TableId,
303 instance_id: LocalInstanceId,
304 committed_version: CommittedVersion,
305 vnodes: Arc<Bitmap>,
306 ) -> Self {
307 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
308 }
309
310 pub fn table_id(&self) -> TableId {
311 self.table_id
312 }
313
314 pub fn add_imm(&mut self, imm: ImmutableMemtable) {
315 if let Some(item) = self
316 .staging
317 .pending_imms
318 .last()
319 .or_else(|| self.staging.uploading_imms.front())
320 {
321 debug_assert!(item.batch_id() < imm.batch_id());
323 }
324
325 self.staging.pending_imm_size += imm.size();
326 self.staging.pending_imms.push(imm);
327 }
328
329 pub fn pending_imm_size(&self) -> usize {
330 self.staging.pending_imm_size
331 }
332
333 pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
334 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
335 for imm in &pending_imms {
336 self.staging.uploading_imms.push_front(imm.clone());
337 }
338 self.staging.pending_imm_size = 0;
339 pending_imms
340 }
341
342 pub fn update(&mut self, info: VersionUpdate) {
352 match info {
353 VersionUpdate::Sst(staging_sst_ref) => {
354 {
355 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
356 warn!(
357 instance_id = self.instance_id,
358 "no related imm in sst input"
359 );
360 return;
361 };
362
363 for imm_id in imms.iter().rev() {
365 let check_err = match self.staging.uploading_imms.pop_back() {
366 None => Some("empty".to_owned()),
367 Some(prev_imm_id) => {
368 if prev_imm_id.batch_id() == *imm_id {
369 None
370 } else {
371 Some(format!(
372 "miss match id {} {}",
373 prev_imm_id.batch_id(),
374 *imm_id
375 ))
376 }
377 }
378 };
379 assert!(
380 check_err.is_none(),
381 "should be valid staging_sst.size {},
382 staging_sst.imm_ids {:?},
383 staging_sst.epochs {:?},
384 local_pending_imm_ids {:?},
385 local_uploading_imm_ids {:?},
386 instance_id {}
387 check_err {:?}",
388 staging_sst_ref.imm_size,
389 staging_sst_ref.imm_ids,
390 staging_sst_ref.epochs,
391 self.staging
392 .pending_imms
393 .iter()
394 .map(|imm| imm.batch_id())
395 .collect_vec(),
396 self.staging
397 .uploading_imms
398 .iter()
399 .map(|imm| imm.batch_id())
400 .collect_vec(),
401 self.instance_id,
402 check_err
403 );
404 }
405
406 self.staging.sst.push_front(staging_sst_ref);
407 }
408 }
409
410 VersionUpdate::CommittedSnapshot(committed_version) => {
411 if let Some(info) = committed_version
412 .state_table_info
413 .info()
414 .get(&self.table_id)
415 {
416 let committed_epoch = info.committed_epoch;
417 if self.is_replicated {
418 self.staging
419 .uploading_imms
420 .retain(|imm| imm.min_epoch() > committed_epoch);
421 self.staging
422 .pending_imms
423 .retain(|imm| imm.min_epoch() > committed_epoch);
424 } else {
425 self.staging
426 .pending_imms
427 .iter()
428 .chain(self.staging.uploading_imms.iter())
429 .for_each(|imm| {
430 assert!(
431 imm.min_epoch() > committed_epoch,
432 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
433 imm.table_id,
434 imm.min_epoch(),
435 committed_epoch
436 )
437 });
438 }
439
440 self.staging.sst.retain(|sst| {
441 sst.epochs.first().expect("epochs not empty") > &committed_epoch
442 });
443
444 assert!(self.staging.sst.iter().all(|sst| {
446 sst.epochs.last().expect("epochs not empty") > &committed_epoch
447 }));
448
449 if let Some(committed_watermarks) =
450 committed_version.table_watermarks.get(&self.table_id)
451 && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
452 {
453 if let Some(watermark_index) = &mut self.table_watermarks {
454 watermark_index.apply_committed_watermarks(
455 committed_watermarks.clone(),
456 committed_epoch,
457 );
458 } else {
459 self.table_watermarks =
460 Some(PkPrefixTableWatermarksIndex::new_committed(
461 committed_watermarks.clone(),
462 committed_epoch,
463 ));
464 }
465 }
466 }
467
468 self.committed = committed_version;
469 }
470 VersionUpdate::NewTableWatermark {
471 direction,
472 epoch,
473 vnode_watermarks,
474 watermark_type,
475 } => {
476 assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
477 if let Some(watermark_index) = &mut self.table_watermarks {
478 watermark_index.add_epoch_watermark(
479 epoch,
480 Arc::from(vnode_watermarks),
481 direction,
482 );
483 } else {
484 self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
485 direction,
486 epoch,
487 vnode_watermarks,
488 self.committed.table_committed_epoch(self.table_id),
489 ));
490 }
491 }
492 }
493 }
494
495 pub fn staging(&self) -> &StagingVersion {
496 &self.staging
497 }
498
499 pub fn committed(&self) -> &CommittedVersion {
500 &self.committed
501 }
502
503 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
508 if let Some(watermark_index) = &self.table_watermarks {
509 watermark_index.filter_regress_watermarks(watermarks)
510 }
511 }
512
513 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
514 self.table_watermarks
515 .as_ref()
516 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
517 }
518
519 pub fn is_replicated(&self) -> bool {
520 self.is_replicated
521 }
522
523 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
524 std::mem::replace(&mut self.vnodes, vnodes)
525 }
526
527 pub fn contains(&self, vnode: VirtualNode) -> bool {
528 self.vnodes.is_set(vnode.to_index())
529 }
530
531 pub fn vnodes(&self) -> Arc<Bitmap> {
532 self.vnodes.clone()
533 }
534}
535
536pub fn read_filter_for_version(
537 epoch: HummockEpoch,
538 table_id: TableId,
539 mut table_key_range: TableKeyRange,
540 read_version: &RwLock<HummockReadVersion>,
541) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
542 let read_version_guard = read_version.read();
543
544 let committed_version = read_version_guard.committed().clone();
545
546 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
547 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
548 }
549
550 let (imm_iter, sst_iter) =
551 read_version_guard
552 .staging()
553 .prune_overlap(epoch, table_id, &table_key_range);
554
555 let imms = imm_iter.cloned().collect();
556 let ssts = sst_iter.cloned().collect();
557
558 Ok((table_key_range, (imms, ssts, committed_version)))
559}
560
561#[derive(Clone)]
562pub struct HummockVersionReader {
563 sstable_store: SstableStoreRef,
564
565 state_store_metrics: Arc<HummockStateStoreMetrics>,
567 preload_retry_times: usize,
568}
569
570impl HummockVersionReader {
573 pub fn new(
574 sstable_store: SstableStoreRef,
575 state_store_metrics: Arc<HummockStateStoreMetrics>,
576 preload_retry_times: usize,
577 ) -> Self {
578 Self {
579 sstable_store,
580 state_store_metrics,
581 preload_retry_times,
582 }
583 }
584
585 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
586 &self.state_store_metrics
587 }
588}
589
590const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
591
592impl HummockVersionReader {
593 pub async fn get<'a, O>(
594 &'a self,
595 table_key: TableKey<Bytes>,
596 epoch: u64,
597 table_id: TableId,
598 read_options: ReadOptions,
599 read_version_tuple: ReadVersionTuple,
600 on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
601 ) -> StorageResult<Option<O>> {
602 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
603
604 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
605 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
606 let local_stats = &mut stats_guard.local_stats;
607 local_stats.found_key = true;
608
609 for imm in &imms {
611 if imm.max_epoch() < min_epoch {
613 continue;
614 }
615
616 local_stats.staging_imm_get_count += 1;
617
618 if let Some((data, data_epoch)) = get_from_batch(
619 imm,
620 TableKey(table_key.as_ref()),
621 epoch,
622 &read_options,
623 local_stats,
624 ) {
625 return Ok(if data_epoch.pure_epoch() < min_epoch {
626 None
627 } else {
628 data.into_user_value()
629 .map(|v| {
630 on_key_value_fn(
631 FullKey::new_with_gap_epoch(
632 table_id,
633 table_key.to_ref(),
634 data_epoch,
635 ),
636 v.as_ref(),
637 )
638 })
639 .transpose()?
640 });
641 }
642 }
643
644 let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
646 Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
647 });
648
649 let full_key = FullKey::new_with_gap_epoch(
652 table_id,
653 TableKey(table_key.clone()),
654 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
655 );
656 for local_sst in &uncommitted_ssts {
657 local_stats.staging_sst_get_count += 1;
658 if let Some(iter) = get_from_sstable_info(
659 self.sstable_store.clone(),
660 local_sst,
661 full_key.to_ref(),
662 &read_options,
663 dist_key_hash,
664 local_stats,
665 )
666 .await?
667 {
668 debug_assert!(iter.is_valid());
669 let data_epoch = iter.key().epoch_with_gap;
670 return Ok(if data_epoch.pure_epoch() < min_epoch {
671 None
672 } else {
673 iter.value()
674 .into_user_value()
675 .map(|v| {
676 on_key_value_fn(
677 FullKey::new_with_gap_epoch(
678 table_id,
679 table_key.to_ref(),
680 data_epoch,
681 ),
682 v,
683 )
684 })
685 .transpose()?
686 });
687 }
688 }
689 let single_table_key_range = table_key.clone()..=table_key.clone();
690 assert!(committed_version.is_valid());
694 for level in committed_version.levels(table_id) {
695 if level.table_infos.is_empty() {
696 continue;
697 }
698
699 match level.level_type {
700 LevelType::Overlapping | LevelType::Unspecified => {
701 let sstable_infos = prune_overlapping_ssts(
702 &level.table_infos,
703 table_id,
704 &single_table_key_range,
705 );
706 for sstable_info in sstable_infos {
707 local_stats.overlapping_get_count += 1;
708 if let Some(iter) = get_from_sstable_info(
709 self.sstable_store.clone(),
710 sstable_info,
711 full_key.to_ref(),
712 &read_options,
713 dist_key_hash,
714 local_stats,
715 )
716 .await?
717 {
718 debug_assert!(iter.is_valid());
719 let data_epoch = iter.key().epoch_with_gap;
720 return Ok(if data_epoch.pure_epoch() < min_epoch {
721 None
722 } else {
723 iter.value()
724 .into_user_value()
725 .map(|v| {
726 on_key_value_fn(
727 FullKey::new_with_gap_epoch(
728 table_id,
729 table_key.to_ref(),
730 data_epoch,
731 ),
732 v,
733 )
734 })
735 .transpose()?
736 });
737 }
738 }
739 }
740 LevelType::Nonoverlapping => {
741 let mut table_info_idx =
742 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
743 if table_info_idx == 0 {
744 continue;
745 }
746 table_info_idx = table_info_idx.saturating_sub(1);
747 let ord = level.table_infos[table_info_idx]
748 .key_range
749 .compare_right_with_user_key(full_key.user_key.as_ref());
750 if ord == Ordering::Less {
752 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
753 continue;
754 }
755
756 local_stats.non_overlapping_get_count += 1;
757 if let Some(iter) = get_from_sstable_info(
758 self.sstable_store.clone(),
759 &level.table_infos[table_info_idx],
760 full_key.to_ref(),
761 &read_options,
762 dist_key_hash,
763 local_stats,
764 )
765 .await?
766 {
767 debug_assert!(iter.is_valid());
768 let data_epoch = iter.key().epoch_with_gap;
769 return Ok(if data_epoch.pure_epoch() < min_epoch {
770 None
771 } else {
772 iter.value()
773 .into_user_value()
774 .map(|v| {
775 on_key_value_fn(
776 FullKey::new_with_gap_epoch(
777 table_id,
778 table_key.to_ref(),
779 data_epoch,
780 ),
781 v,
782 )
783 })
784 .transpose()?
785 });
786 }
787 }
788 }
789 }
790 stats_guard.local_stats.found_key = false;
791 Ok(None)
792 }
793
794 pub async fn iter(
795 &self,
796 table_key_range: TableKeyRange,
797 epoch: u64,
798 table_id: TableId,
799 read_options: ReadOptions,
800 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
801 ) -> StorageResult<HummockStorageIterator> {
802 self.iter_with_memtable(
803 table_key_range,
804 epoch,
805 table_id,
806 read_options,
807 read_version_tuple,
808 None,
809 )
810 .await
811 }
812
813 pub async fn iter_with_memtable<'b>(
814 &self,
815 table_key_range: TableKeyRange,
816 epoch: u64,
817 table_id: TableId,
818 read_options: ReadOptions,
819 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
820 memtable_iter: Option<MemTableHummockIterator<'b>>,
821 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
822 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
823 let user_key_range = (
824 user_key_range_ref.0.map(|key| key.cloned()),
825 user_key_range_ref.1.map(|key| key.cloned()),
826 );
827 let mut factory = ForwardIteratorFactory::default();
828 let mut local_stats = StoreLocalStatistic::default();
829 let (imms, uncommitted_ssts, committed) = read_version_tuple;
830 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
831 self.iter_inner(
832 table_key_range,
833 epoch,
834 table_id,
835 read_options,
836 imms,
837 uncommitted_ssts,
838 &committed,
839 &mut local_stats,
840 &mut factory,
841 )
842 .await?;
843 let merge_iter = factory.build(memtable_iter);
844 let mut user_iter = UserIterator::new(
846 merge_iter,
847 user_key_range,
848 epoch,
849 min_epoch,
850 Some(committed),
851 );
852 user_iter.rewind().await?;
853 Ok(HummockStorageIteratorInner::new(
854 user_iter,
855 self.state_store_metrics.clone(),
856 table_id,
857 local_stats,
858 ))
859 }
860
861 pub async fn rev_iter<'b>(
862 &self,
863 table_key_range: TableKeyRange,
864 epoch: u64,
865 table_id: TableId,
866 read_options: ReadOptions,
867 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
868 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
869 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
870 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
871 let user_key_range = (
872 user_key_range_ref.0.map(|key| key.cloned()),
873 user_key_range_ref.1.map(|key| key.cloned()),
874 );
875 let mut factory = BackwardIteratorFactory::default();
876 let mut local_stats = StoreLocalStatistic::default();
877 let (imms, uncommitted_ssts, committed) = read_version_tuple;
878 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
879 self.iter_inner(
880 table_key_range,
881 epoch,
882 table_id,
883 read_options,
884 imms,
885 uncommitted_ssts,
886 &committed,
887 &mut local_stats,
888 &mut factory,
889 )
890 .await?;
891 let merge_iter = factory.build(memtable_iter);
892 let mut user_iter = BackwardUserIterator::new(
894 merge_iter,
895 user_key_range,
896 epoch,
897 min_epoch,
898 Some(committed),
899 );
900 user_iter.rewind().await?;
901 Ok(HummockStorageRevIteratorInner::new(
902 user_iter,
903 self.state_store_metrics.clone(),
904 table_id,
905 local_stats,
906 ))
907 }
908
909 async fn iter_inner<F: IteratorFactory>(
910 &self,
911 table_key_range: TableKeyRange,
912 epoch: u64,
913 table_id: TableId,
914 read_options: ReadOptions,
915 imms: Vec<ImmutableMemtable>,
916 uncommitted_ssts: Vec<SstableInfo>,
917 committed: &CommittedVersion,
918 local_stats: &mut StoreLocalStatistic,
919 factory: &mut F,
920 ) -> StorageResult<()> {
921 {
922 fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
923 match bound {
924 Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
925 Bound::Unbounded => None,
926 }
927 }
928 let (left, right) = &table_key_range;
929 if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
930 && right < left
931 {
932 if cfg!(debug_assertions) {
933 panic!("invalid iter key range: {table_id} {left:?} {right:?}")
934 } else {
935 return Err(HummockError::other(format!(
936 "invalid iter key range: {table_id} {left:?} {right:?}"
937 ))
938 .into());
939 }
940 }
941 }
942
943 local_stats.staging_imm_iter_count = imms.len() as u64;
944 for imm in imms {
945 factory.add_batch_iter(imm);
946 }
947
948 let user_key_range = bound_table_key_range(table_id, &table_key_range);
952 let user_key_range_ref = (
953 user_key_range.0.as_ref().map(UserKey::as_ref),
954 user_key_range.1.as_ref().map(UserKey::as_ref),
955 );
956 let mut staging_sst_iter_count = 0;
957 let bloom_filter_prefix_hash = read_options
959 .prefix_hint
960 .as_ref()
961 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
962 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
963 if read_options.prefetch_options.prefetch {
964 sst_read_options.must_iterated_end_user_key =
965 Some(user_key_range.1.map(|key| key.cloned()));
966 sst_read_options.max_preload_retry_times = self.preload_retry_times;
967 }
968 let sst_read_options = Arc::new(sst_read_options);
969 for sstable_info in &uncommitted_ssts {
970 let table_holder = self
971 .sstable_store
972 .sstable(sstable_info, local_stats)
973 .await?;
974
975 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
976 && !hit_sstable_bloom_filter(
977 &table_holder,
978 &user_key_range_ref,
979 *prefix_hash,
980 local_stats,
981 )
982 {
983 continue;
984 }
985
986 staging_sst_iter_count += 1;
987 factory.add_staging_sst_iter(F::SstableIteratorType::create(
988 table_holder,
989 self.sstable_store.clone(),
990 sst_read_options.clone(),
991 sstable_info,
992 ));
993 }
994 local_stats.staging_sst_iter_count = staging_sst_iter_count;
995
996 let timer = Instant::now();
997
998 for level in committed.levels(table_id) {
999 if level.table_infos.is_empty() {
1000 continue;
1001 }
1002
1003 if level.level_type == LevelType::Nonoverlapping {
1004 let mut table_infos =
1005 prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1006 .peekable();
1007
1008 if table_infos.peek().is_none() {
1009 continue;
1010 }
1011 let sstable_infos = table_infos.cloned().collect_vec();
1012 if sstable_infos.len() > 1 {
1013 factory.add_concat_sst_iter(
1014 sstable_infos,
1015 self.sstable_store.clone(),
1016 sst_read_options.clone(),
1017 );
1018 local_stats.non_overlapping_iter_count += 1;
1019 } else {
1020 let sstable = self
1021 .sstable_store
1022 .sstable(&sstable_infos[0], local_stats)
1023 .await?;
1024
1025 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1026 && !hit_sstable_bloom_filter(
1027 &sstable,
1028 &user_key_range_ref,
1029 *dist_hash,
1030 local_stats,
1031 )
1032 {
1033 continue;
1034 }
1035 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1041 sstable,
1042 self.sstable_store.clone(),
1043 sst_read_options.clone(),
1044 &sstable_infos[0],
1045 ));
1046 local_stats.non_overlapping_iter_count += 1;
1047 }
1048 } else {
1049 let table_infos =
1050 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1051 let fetch_meta_req = table_infos.rev().collect_vec();
1053 if fetch_meta_req.is_empty() {
1054 continue;
1055 }
1056 for sstable_info in fetch_meta_req {
1057 let sstable = self
1058 .sstable_store
1059 .sstable(sstable_info, local_stats)
1060 .await?;
1061 assert_eq!(sstable_info.object_id, sstable.id);
1062 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1063 && !hit_sstable_bloom_filter(
1064 &sstable,
1065 &user_key_range_ref,
1066 *dist_hash,
1067 local_stats,
1068 )
1069 {
1070 continue;
1071 }
1072 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1073 sstable,
1074 self.sstable_store.clone(),
1075 sst_read_options.clone(),
1076 sstable_info,
1077 ));
1078 local_stats.overlapping_iter_count += 1;
1079 }
1080 }
1081 }
1082 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1083 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1084 let table_id_string = table_id.to_string();
1085 tracing::warn!(
1086 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1087 table_id_string,
1088 epoch,
1089 fetch_meta_duration_sec,
1090 local_stats.cache_meta_block_miss
1091 );
1092 self.state_store_metrics
1093 .iter_slow_fetch_meta_cache_unhits
1094 .set(local_stats.cache_meta_block_miss as i64);
1095 }
1096 Ok(())
1097 }
1098
1099 pub async fn iter_log(
1100 &self,
1101 version: PinnedVersion,
1102 epoch_range: (u64, u64),
1103 key_range: TableKeyRange,
1104 options: ReadLogOptions,
1105 ) -> HummockResult<ChangeLogIterator> {
1106 let change_log = {
1107 let table_change_logs = version.table_change_log_read_lock();
1108 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1109 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1110 } else {
1111 Vec::new()
1112 }
1113 };
1114
1115 if let Some(max_epoch_change_log) = change_log.last() {
1116 let (_, max_epoch) = epoch_range;
1117 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1118 warn!(
1119 max_epoch,
1120 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1121 table_id = %options.table_id,
1122 "max_epoch does not exist"
1123 );
1124 }
1125 }
1126 let read_options = Arc::new(SstableIteratorReadOptions {
1127 cache_policy: Default::default(),
1128 must_iterated_end_user_key: None,
1129 max_preload_retry_times: 0,
1130 prefetch_for_large_query: false,
1131 });
1132
1133 async fn make_iter(
1134 sstable_infos: impl Iterator<Item = &SstableInfo>,
1135 sstable_store: &SstableStoreRef,
1136 read_options: Arc<SstableIteratorReadOptions>,
1137 local_stat: &mut StoreLocalStatistic,
1138 ) -> HummockResult<MergeIterator<SstableIterator>> {
1139 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1140 let sstable_store = sstable_store.clone();
1141 let read_options = read_options.clone();
1142 async move {
1143 let mut local_stat = StoreLocalStatistic::default();
1144 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1145 Ok::<_, HummockError>((
1146 SstableIterator::new(
1147 table_holder,
1148 sstable_store,
1149 read_options,
1150 sstable_info,
1151 ),
1152 local_stat,
1153 ))
1154 }
1155 }))
1156 .await?;
1157 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1158 |(iter, stats)| {
1159 local_stat.add(&stats);
1160 iter
1161 },
1162 )))
1163 }
1164
1165 let mut local_stat = StoreLocalStatistic::default();
1166
1167 let new_value_iter = make_iter(
1168 change_log
1169 .iter()
1170 .flat_map(|log| log.new_value.iter())
1171 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1172 &self.sstable_store,
1173 read_options.clone(),
1174 &mut local_stat,
1175 )
1176 .await?;
1177 let old_value_iter = make_iter(
1178 change_log
1179 .iter()
1180 .flat_map(|log| log.old_value.iter())
1181 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1182 &self.sstable_store,
1183 read_options.clone(),
1184 &mut local_stat,
1185 )
1186 .await?;
1187 ChangeLogIterator::new(
1188 epoch_range,
1189 key_range,
1190 new_value_iter,
1191 old_value_iter,
1192 options.table_id,
1193 IterLocalMetricsGuard::new(
1194 self.state_store_metrics.clone(),
1195 options.table_id,
1196 local_stat,
1197 ),
1198 )
1199 .await
1200 }
1201
1202 pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1203 &'a self,
1204 version: PinnedVersion,
1205 table_id: TableId,
1206 target: VectorRef<'a>,
1207 options: VectorNearestOptions,
1208 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1209 ) -> HummockResult<Vec<O>> {
1210 let Some(index) = version.vector_indexes.get(&table_id) else {
1211 return Ok(vec![]);
1212 };
1213 if target.dimension() != index.dimension {
1214 return Err(HummockError::other(format!(
1215 "target dimension {} not match index dimension {}",
1216 target.dimension(),
1217 index.dimension
1218 )));
1219 }
1220 match &index.inner {
1221 VectorIndexImpl::Flat(flat) => {
1222 let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1223 let mut cache_stat = VectorStoreCacheStats::default();
1224 for vector_file in &flat.vector_store_info.vector_files {
1225 let meta = self
1226 .sstable_store
1227 .get_vector_file_meta(vector_file, &mut cache_stat)
1228 .await?;
1229 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1230 let block = self
1231 .sstable_store
1232 .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1233 .await?;
1234 builder.add(&**block, &on_nearest_item_fn);
1235 }
1236 }
1237 cache_stat.report(table_id, "flat", self.stats());
1238 Ok(builder.finish())
1239 }
1240 VectorIndexImpl::HnswFlat(hnsw_flat) => {
1241 let Some(graph_file) = &hnsw_flat.graph_file else {
1242 return Ok(vec![]);
1243 };
1244
1245 let mut ctx = FileVectorStoreCtx::default();
1246
1247 let graph = self
1248 .sstable_store
1249 .get_hnsw_graph(graph_file, &mut ctx.stats)
1250 .await?;
1251
1252 let vector_store =
1253 FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1254 let (items, stats) = nearest::<O, M, _>(
1255 &vector_store,
1256 &mut ctx,
1257 &*graph,
1258 target,
1259 on_nearest_item_fn,
1260 options.hnsw_ef_search,
1261 options.top_n,
1262 )
1263 .await?;
1264 ctx.stats.report(table_id, "hnsw_read", self.stats());
1265 report_hnsw_stat(
1266 self.stats(),
1267 table_id,
1268 "hnsw_read",
1269 options.top_n,
1270 options.hnsw_ef_search,
1271 [stats],
1272 );
1273 Ok(items)
1274 }
1275 }
1276 }
1277}