1use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54 MemoryTracker, filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts,
55 range_overlap, search_sst_idx,
56};
57use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
58use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
59use crate::hummock::{
60 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
61 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
62 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
63 hit_sstable_bloom_filter,
64};
65use crate::mem_table::{
66 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
67};
68use crate::monitor::{
69 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
70};
71use crate::store::{
72 OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
73};
74use crate::vector::hnsw::nearest;
75use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
76
77pub type CommittedVersion = PinnedVersion;
78
79#[derive(Clone, Debug, PartialEq)]
85pub struct StagingSstableInfo {
86 sstable_infos: Vec<LocalSstableInfo>,
88 old_value_sstable_infos: Vec<LocalSstableInfo>,
89 epochs: Vec<HummockEpoch>,
92 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
94 imm_size: usize,
95}
96
97impl StagingSstableInfo {
98 pub fn new(
99 sstable_infos: Vec<LocalSstableInfo>,
100 old_value_sstable_infos: Vec<LocalSstableInfo>,
101 epochs: Vec<HummockEpoch>,
102 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
103 imm_size: usize,
104 ) -> Self {
105 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
107 Self {
108 sstable_infos,
109 old_value_sstable_infos,
110 epochs,
111 imm_ids,
112 imm_size,
113 }
114 }
115
116 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
117 &self.sstable_infos
118 }
119
120 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
121 &self.old_value_sstable_infos
122 }
123
124 pub fn imm_size(&self) -> usize {
125 self.imm_size
126 }
127
128 pub fn epochs(&self) -> &Vec<HummockEpoch> {
129 &self.epochs
130 }
131
132 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
133 &self.imm_ids
134 }
135}
136
137pub enum VersionUpdate {
138 Sst(Arc<StagingSstableInfo>),
139 CommittedSnapshot(CommittedVersion),
140 NewTableWatermark {
141 direction: WatermarkDirection,
142 epoch: HummockEpoch,
143 vnode_watermarks: Vec<VnodeWatermark>,
144 watermark_type: WatermarkSerdeType,
145 },
146}
147
148pub struct StagingVersion {
149 pending_imm_size: usize,
150 pub pending_imms: Vec<(ImmutableMemtable, MemoryTracker)>,
157 pub uploading_imms: VecDeque<ImmutableMemtable>,
163
164 pub sst: VecDeque<Arc<StagingSstableInfo>>,
166}
167
168impl StagingVersion {
169 pub fn prune_overlap<'a>(
172 &'a self,
173 max_epoch_inclusive: HummockEpoch,
174 table_id: TableId,
175 table_key_range: &'a TableKeyRange,
176 ) -> (
177 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
178 impl Iterator<Item = &'a SstableInfo> + 'a,
179 ) {
180 let (left, right) = table_key_range;
181 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
182 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
183 let overlapped_imms = self
184 .pending_imms
185 .iter()
186 .map(|(imm, _)| imm)
187 .rev() .chain(self.uploading_imms.iter())
189 .filter(move |imm| {
190 imm.min_epoch() <= max_epoch_inclusive
192 && imm.table_id == table_id
193 && range_overlap(
194 &(left, right),
195 &imm.start_table_key(),
196 Bound::Included(&imm.end_table_key()),
197 )
198 });
199
200 let overlapped_ssts = self
202 .sst
203 .iter()
204 .filter(move |staging_sst| {
205 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
206 sst_max_epoch <= max_epoch_inclusive
207 })
208 .flat_map(move |staging_sst| {
209 staging_sst
212 .sstable_infos
213 .iter()
214 .map(|sstable| &sstable.sst_info)
215 .filter(move |sstable: &&SstableInfo| {
216 filter_single_sst(sstable, table_id, table_key_range)
217 })
218 });
219 (overlapped_imms, overlapped_ssts)
220 }
221
222 pub fn is_empty(&self) -> bool {
223 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
224 }
225}
226
227pub struct HummockReadVersion {
229 table_id: TableId,
230 instance_id: LocalInstanceId,
231
232 is_initialized: bool,
233
234 staging: StagingVersion,
236
237 committed: CommittedVersion,
239
240 is_replicated: bool,
245
246 table_watermarks: Option<TableWatermarksIndex>,
247
248 vnodes: Arc<Bitmap>,
251}
252
253impl HummockReadVersion {
254 pub fn new_with_replication_option(
255 table_id: TableId,
256 instance_id: LocalInstanceId,
257 committed_version: CommittedVersion,
258 is_replicated: bool,
259 vnodes: Arc<Bitmap>,
260 ) -> Self {
261 assert!(committed_version.is_valid());
265 Self {
266 table_id,
267 instance_id,
268 table_watermarks: {
269 match committed_version.table_watermarks.get(&table_id) {
270 Some(table_watermarks) => Some(TableWatermarksIndex::new_committed(
271 table_watermarks.clone(),
272 committed_version
273 .state_table_info
274 .info()
275 .get(&table_id)
276 .expect("should exist")
277 .committed_epoch,
278 table_watermarks.watermark_type,
279 )),
280 None => None,
281 }
282 },
283 staging: StagingVersion {
284 pending_imm_size: 0,
285 pending_imms: Vec::default(),
286 uploading_imms: VecDeque::default(),
287 sst: VecDeque::default(),
288 },
289
290 committed: committed_version,
291
292 is_replicated,
293 vnodes,
294 is_initialized: false,
295 }
296 }
297
298 pub fn new(
299 table_id: TableId,
300 instance_id: LocalInstanceId,
301 committed_version: CommittedVersion,
302 vnodes: Arc<Bitmap>,
303 ) -> Self {
304 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
305 }
306
307 pub fn table_id(&self) -> TableId {
308 self.table_id
309 }
310
311 pub fn init(&mut self) {
312 assert!(!self.is_initialized);
313 self.is_initialized = true;
314 }
315
316 pub fn add_pending_imm(&mut self, imm: ImmutableMemtable, tracker: MemoryTracker) {
317 assert!(self.is_initialized);
318 assert!(!self.is_replicated);
319 if let Some(item) = self
320 .staging
321 .pending_imms
322 .last()
323 .map(|(imm, _)| imm)
324 .or_else(|| self.staging.uploading_imms.front())
325 {
326 assert!(item.batch_id() < imm.batch_id());
328 }
329
330 self.staging.pending_imm_size += imm.size();
331 self.staging.pending_imms.push((imm, tracker));
332 }
333
334 pub fn add_replicated_imm(&mut self, imm: ImmutableMemtable) {
335 assert!(self.is_initialized);
336 assert!(self.is_replicated);
337 assert!(self.staging.pending_imms.is_empty());
338 if let Some(item) = self.staging.uploading_imms.front() {
339 assert!(item.batch_id() < imm.batch_id());
341 }
342 self.staging.uploading_imms.push_front(imm);
343 }
344
345 pub fn pending_imm_size(&self) -> usize {
346 self.staging.pending_imm_size
347 }
348
349 pub fn start_upload_pending_imms(&mut self) -> Vec<(ImmutableMemtable, MemoryTracker)> {
350 assert!(self.is_initialized);
351 assert!(!self.is_replicated);
352 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
353 for (imm, _) in &pending_imms {
354 self.staging.uploading_imms.push_front(imm.clone());
355 }
356 self.staging.pending_imm_size = 0;
357 pending_imms
358 }
359
360 pub fn update(&mut self, info: VersionUpdate) {
370 match info {
371 VersionUpdate::Sst(staging_sst_ref) => {
372 {
373 assert!(!self.is_replicated);
374 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
375 warn!(
376 instance_id = self.instance_id,
377 "no related imm in sst input"
378 );
379 return;
380 };
381
382 for imm_id in imms.iter().rev() {
384 let check_err = match self.staging.uploading_imms.pop_back() {
385 None => Some("empty".to_owned()),
386 Some(prev_imm_id) => {
387 if prev_imm_id.batch_id() == *imm_id {
388 None
389 } else {
390 Some(format!(
391 "miss match id {} {}",
392 prev_imm_id.batch_id(),
393 *imm_id
394 ))
395 }
396 }
397 };
398 assert!(
399 check_err.is_none(),
400 "should be valid staging_sst.size {},
401 staging_sst.imm_ids {:?},
402 staging_sst.epochs {:?},
403 local_pending_imm_ids {:?},
404 local_uploading_imm_ids {:?},
405 instance_id {}
406 check_err {:?}",
407 staging_sst_ref.imm_size,
408 staging_sst_ref.imm_ids,
409 staging_sst_ref.epochs,
410 self.staging
411 .pending_imms
412 .iter()
413 .map(|(imm, _)| imm.batch_id())
414 .collect_vec(),
415 self.staging
416 .uploading_imms
417 .iter()
418 .map(|imm| imm.batch_id())
419 .collect_vec(),
420 self.instance_id,
421 check_err
422 );
423 }
424
425 self.staging.sst.push_front(staging_sst_ref);
426 }
427 }
428
429 VersionUpdate::CommittedSnapshot(committed_version) => {
430 if let Some(info) = committed_version
431 .state_table_info
432 .info()
433 .get(&self.table_id)
434 {
435 let committed_epoch = info.committed_epoch;
436 if self.is_replicated {
437 self.staging
438 .uploading_imms
439 .retain(|imm| imm.min_epoch() > committed_epoch);
440 self.staging
441 .pending_imms
442 .retain(|(imm, _)| imm.min_epoch() > committed_epoch);
443 } else {
444 self.staging
445 .pending_imms
446 .iter()
447 .map(|(imm, _)| imm)
448 .chain(self.staging.uploading_imms.iter())
449 .for_each(|imm| {
450 assert!(
451 imm.min_epoch() > committed_epoch,
452 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
453 imm.table_id,
454 imm.min_epoch(),
455 committed_epoch
456 )
457 });
458 }
459
460 self.staging.sst.retain(|sst| {
461 sst.epochs.first().expect("epochs not empty") > &committed_epoch
462 });
463
464 assert!(self.staging.sst.iter().all(|sst| {
466 sst.epochs.last().expect("epochs not empty") > &committed_epoch
467 }));
468
469 if let Some(committed_watermarks) =
470 committed_version.table_watermarks.get(&self.table_id)
471 {
472 if let Some(watermark_index) = &mut self.table_watermarks {
473 watermark_index.apply_committed_watermarks(
474 committed_watermarks.clone(),
475 committed_epoch,
476 );
477 } else {
478 self.table_watermarks = Some(TableWatermarksIndex::new_committed(
479 committed_watermarks.clone(),
480 committed_epoch,
481 committed_watermarks.watermark_type,
482 ));
483 }
484 }
485 }
486
487 self.committed = committed_version;
488 }
489 VersionUpdate::NewTableWatermark {
490 direction,
491 epoch,
492 vnode_watermarks,
493 watermark_type,
494 } => {
495 if let Some(watermark_index) = &mut self.table_watermarks {
496 watermark_index.add_epoch_watermark(
497 epoch,
498 Arc::from(vnode_watermarks),
499 direction,
500 );
501 } else {
502 self.table_watermarks = Some(TableWatermarksIndex::new(
503 direction,
504 epoch,
505 vnode_watermarks,
506 self.committed.table_committed_epoch(self.table_id),
507 watermark_type,
508 ));
509 }
510 }
511 }
512 }
513
514 pub fn staging(&self) -> &StagingVersion {
515 &self.staging
516 }
517
518 pub fn committed(&self) -> &CommittedVersion {
519 &self.committed
520 }
521
522 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
527 if let Some(watermark_index) = &self.table_watermarks {
528 watermark_index.filter_regress_watermarks(watermarks)
529 }
530 }
531
532 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
533 self.table_watermarks
534 .as_ref()
535 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
536 }
537
538 pub fn is_initialized(&self) -> bool {
539 self.is_initialized
540 }
541
542 pub fn is_replicated(&self) -> bool {
543 self.is_replicated
544 }
545
546 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
547 std::mem::replace(&mut self.vnodes, vnodes)
548 }
549
550 pub fn contains(&self, vnode: VirtualNode) -> bool {
551 self.vnodes.is_set(vnode.to_index())
552 }
553
554 pub fn vnodes(&self) -> Arc<Bitmap> {
555 self.vnodes.clone()
556 }
557}
558
559pub fn read_filter_for_version(
560 epoch: HummockEpoch,
561 table_id: TableId,
562 mut table_key_range: TableKeyRange,
563 read_version: &RwLock<HummockReadVersion>,
564) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
565 let read_version_guard = read_version.read();
566
567 let committed_version = read_version_guard.committed().clone();
568
569 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
570 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
571 }
572
573 let (imm_iter, sst_iter) =
574 read_version_guard
575 .staging()
576 .prune_overlap(epoch, table_id, &table_key_range);
577
578 let imms = imm_iter.cloned().collect();
579 let ssts = sst_iter.cloned().collect();
580
581 Ok((table_key_range, (imms, ssts, committed_version)))
582}
583
584#[derive(Clone)]
585pub struct HummockVersionReader {
586 sstable_store: SstableStoreRef,
587
588 state_store_metrics: Arc<HummockStateStoreMetrics>,
590 preload_retry_times: usize,
591}
592
593impl HummockVersionReader {
596 pub fn new(
597 sstable_store: SstableStoreRef,
598 state_store_metrics: Arc<HummockStateStoreMetrics>,
599 preload_retry_times: usize,
600 ) -> Self {
601 Self {
602 sstable_store,
603 state_store_metrics,
604 preload_retry_times,
605 }
606 }
607
608 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
609 &self.state_store_metrics
610 }
611}
612
613const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
614
615impl HummockVersionReader {
616 pub async fn get<'a, O>(
617 &'a self,
618 table_key: TableKey<Bytes>,
619 epoch: u64,
620 table_id: TableId,
621 table_option: TableOption,
622 read_options: ReadOptions,
623 read_version_tuple: ReadVersionTuple,
624 on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
625 ) -> StorageResult<Option<O>> {
626 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
627
628 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
629 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
630 let local_stats = &mut stats_guard.local_stats;
631 local_stats.found_key = true;
632
633 for imm in &imms {
635 if imm.max_epoch() < min_epoch {
637 continue;
638 }
639
640 local_stats.staging_imm_get_count += 1;
641
642 if let Some((data, data_epoch)) = get_from_batch(
643 imm,
644 TableKey(table_key.as_ref()),
645 epoch,
646 &read_options,
647 local_stats,
648 ) {
649 return Ok(if data_epoch.pure_epoch() < min_epoch {
650 None
651 } else {
652 data.into_user_value()
653 .map(|v| {
654 on_key_value_fn(
655 FullKey::new_with_gap_epoch(
656 table_id,
657 table_key.to_ref(),
658 data_epoch,
659 ),
660 v.as_ref(),
661 )
662 })
663 .transpose()?
664 });
665 }
666 }
667
668 let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
670 Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
671 });
672
673 let full_key = FullKey::new_with_gap_epoch(
676 table_id,
677 TableKey(table_key.clone()),
678 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
679 );
680 let single_table_key_range = table_key.clone()..=table_key.clone();
681
682 let pruned_uncommitted_ssts =
684 prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
685 for local_sst in pruned_uncommitted_ssts {
686 local_stats.staging_sst_get_count += 1;
687 if let Some(iter) = get_from_sstable_info(
688 self.sstable_store.clone(),
689 local_sst,
690 full_key.to_ref(),
691 &read_options,
692 dist_key_hash,
693 local_stats,
694 )
695 .await?
696 {
697 debug_assert!(iter.is_valid());
698 let data_epoch = iter.key().epoch_with_gap;
699 return Ok(if data_epoch.pure_epoch() < min_epoch {
700 None
701 } else {
702 iter.value()
703 .into_user_value()
704 .map(|v| {
705 on_key_value_fn(
706 FullKey::new_with_gap_epoch(
707 table_id,
708 table_key.to_ref(),
709 data_epoch,
710 ),
711 v,
712 )
713 })
714 .transpose()?
715 });
716 }
717 }
718 assert!(committed_version.is_valid());
722 for level in committed_version.levels(table_id) {
723 if level.table_infos.is_empty() {
724 continue;
725 }
726
727 match level.level_type {
728 LevelType::Overlapping | LevelType::Unspecified => {
729 let sstable_infos = prune_overlapping_ssts(
730 &level.table_infos,
731 table_id,
732 &single_table_key_range,
733 );
734 for sstable_info in sstable_infos {
735 local_stats.overlapping_get_count += 1;
736 if let Some(iter) = get_from_sstable_info(
737 self.sstable_store.clone(),
738 sstable_info,
739 full_key.to_ref(),
740 &read_options,
741 dist_key_hash,
742 local_stats,
743 )
744 .await?
745 {
746 debug_assert!(iter.is_valid());
747 let data_epoch = iter.key().epoch_with_gap;
748 return Ok(if data_epoch.pure_epoch() < min_epoch {
749 None
750 } else {
751 iter.value()
752 .into_user_value()
753 .map(|v| {
754 on_key_value_fn(
755 FullKey::new_with_gap_epoch(
756 table_id,
757 table_key.to_ref(),
758 data_epoch,
759 ),
760 v,
761 )
762 })
763 .transpose()?
764 });
765 }
766 }
767 }
768 LevelType::Nonoverlapping => {
769 let mut table_info_idx =
770 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
771 if table_info_idx == 0 {
772 continue;
773 }
774 table_info_idx = table_info_idx.saturating_sub(1);
775 let ord = level.table_infos[table_info_idx]
776 .key_range
777 .compare_right_with_user_key(full_key.user_key.as_ref());
778 if ord == Ordering::Less {
780 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
781 continue;
782 }
783
784 local_stats.non_overlapping_get_count += 1;
785 if let Some(iter) = get_from_sstable_info(
786 self.sstable_store.clone(),
787 &level.table_infos[table_info_idx],
788 full_key.to_ref(),
789 &read_options,
790 dist_key_hash,
791 local_stats,
792 )
793 .await?
794 {
795 debug_assert!(iter.is_valid());
796 let data_epoch = iter.key().epoch_with_gap;
797 return Ok(if data_epoch.pure_epoch() < min_epoch {
798 None
799 } else {
800 iter.value()
801 .into_user_value()
802 .map(|v| {
803 on_key_value_fn(
804 FullKey::new_with_gap_epoch(
805 table_id,
806 table_key.to_ref(),
807 data_epoch,
808 ),
809 v,
810 )
811 })
812 .transpose()?
813 });
814 }
815 }
816 }
817 }
818 stats_guard.local_stats.found_key = false;
819 Ok(None)
820 }
821
822 pub async fn iter(
823 &self,
824 table_key_range: TableKeyRange,
825 epoch: u64,
826 table_id: TableId,
827 table_option: TableOption,
828 read_options: ReadOptions,
829 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
830 ) -> StorageResult<HummockStorageIterator> {
831 self.iter_with_memtable(
832 table_key_range,
833 epoch,
834 table_id,
835 table_option,
836 read_options,
837 read_version_tuple,
838 None,
839 )
840 .await
841 }
842
843 pub async fn iter_with_memtable<'b>(
844 &self,
845 table_key_range: TableKeyRange,
846 epoch: u64,
847 table_id: TableId,
848 table_option: TableOption,
849 read_options: ReadOptions,
850 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
851 memtable_iter: Option<MemTableHummockIterator<'b>>,
852 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
853 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
854 let user_key_range = (
855 user_key_range_ref.0.map(|key| key.cloned()),
856 user_key_range_ref.1.map(|key| key.cloned()),
857 );
858 let mut factory = ForwardIteratorFactory::default();
859 let mut local_stats = StoreLocalStatistic::default();
860 let (imms, uncommitted_ssts, committed) = read_version_tuple;
861 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
862 self.iter_inner(
863 table_key_range,
864 epoch,
865 table_id,
866 read_options,
867 imms,
868 uncommitted_ssts,
869 &committed,
870 &mut local_stats,
871 &mut factory,
872 )
873 .await?;
874 let merge_iter = factory.build(memtable_iter);
875 let mut user_iter = UserIterator::new(
877 merge_iter,
878 user_key_range,
879 epoch,
880 min_epoch,
881 Some(committed),
882 );
883 user_iter.rewind().await?;
884 Ok(HummockStorageIteratorInner::new(
885 user_iter,
886 self.state_store_metrics.clone(),
887 table_id,
888 local_stats,
889 ))
890 }
891
892 pub async fn rev_iter<'b>(
893 &self,
894 table_key_range: TableKeyRange,
895 epoch: u64,
896 table_id: TableId,
897 table_option: TableOption,
898 read_options: ReadOptions,
899 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
900 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
901 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
902 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
903 let user_key_range = (
904 user_key_range_ref.0.map(|key| key.cloned()),
905 user_key_range_ref.1.map(|key| key.cloned()),
906 );
907 let mut factory = BackwardIteratorFactory::default();
908 let mut local_stats = StoreLocalStatistic::default();
909 let (imms, uncommitted_ssts, committed) = read_version_tuple;
910 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
911 self.iter_inner(
912 table_key_range,
913 epoch,
914 table_id,
915 read_options,
916 imms,
917 uncommitted_ssts,
918 &committed,
919 &mut local_stats,
920 &mut factory,
921 )
922 .await?;
923 let merge_iter = factory.build(memtable_iter);
924 let mut user_iter = BackwardUserIterator::new(
926 merge_iter,
927 user_key_range,
928 epoch,
929 min_epoch,
930 Some(committed),
931 );
932 user_iter.rewind().await?;
933 Ok(HummockStorageRevIteratorInner::new(
934 user_iter,
935 self.state_store_metrics.clone(),
936 table_id,
937 local_stats,
938 ))
939 }
940
941 async fn iter_inner<F: IteratorFactory>(
942 &self,
943 table_key_range: TableKeyRange,
944 epoch: u64,
945 table_id: TableId,
946 read_options: ReadOptions,
947 imms: Vec<ImmutableMemtable>,
948 uncommitted_ssts: Vec<SstableInfo>,
949 committed: &CommittedVersion,
950 local_stats: &mut StoreLocalStatistic,
951 factory: &mut F,
952 ) -> StorageResult<()> {
953 {
954 fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
955 match bound {
956 Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
957 Bound::Unbounded => None,
958 }
959 }
960 let (left, right) = &table_key_range;
961 if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
962 && right < left
963 {
964 if cfg!(debug_assertions) {
965 panic!("invalid iter key range: {table_id} {left:?} {right:?}")
966 } else {
967 return Err(HummockError::other(format!(
968 "invalid iter key range: {table_id} {left:?} {right:?}"
969 ))
970 .into());
971 }
972 }
973 }
974
975 local_stats.staging_imm_iter_count = imms.len() as u64;
976 for imm in imms {
977 factory.add_batch_iter(imm);
978 }
979
980 let user_key_range = bound_table_key_range(table_id, &table_key_range);
984 let user_key_range_ref = (
985 user_key_range.0.as_ref().map(UserKey::as_ref),
986 user_key_range.1.as_ref().map(UserKey::as_ref),
987 );
988 let mut staging_sst_iter_count = 0;
989 let bloom_filter_prefix_hash = read_options
991 .prefix_hint
992 .as_ref()
993 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
994 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
995 if read_options.prefetch_options.prefetch {
996 sst_read_options.must_iterated_end_user_key =
997 Some(user_key_range.1.map(|key| key.cloned()));
998 sst_read_options.max_preload_retry_times = self.preload_retry_times;
999 }
1000 let sst_read_options = Arc::new(sst_read_options);
1001 for sstable_info in &uncommitted_ssts {
1002 let table_holder = self
1003 .sstable_store
1004 .sstable(sstable_info, local_stats)
1005 .await?;
1006
1007 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
1008 && !hit_sstable_bloom_filter(
1009 &table_holder,
1010 &user_key_range_ref,
1011 *prefix_hash,
1012 local_stats,
1013 )
1014 {
1015 continue;
1016 }
1017
1018 staging_sst_iter_count += 1;
1019 factory.add_staging_sst_iter(F::SstableIteratorType::create(
1020 table_holder,
1021 self.sstable_store.clone(),
1022 sst_read_options.clone(),
1023 sstable_info,
1024 ));
1025 }
1026 local_stats.staging_sst_iter_count = staging_sst_iter_count;
1027
1028 let timer = Instant::now();
1029
1030 for level in committed.levels(table_id) {
1031 if level.table_infos.is_empty() {
1032 continue;
1033 }
1034
1035 if level.level_type == LevelType::Nonoverlapping {
1036 let mut table_infos =
1037 prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1038 .peekable();
1039
1040 if table_infos.peek().is_none() {
1041 continue;
1042 }
1043 let sstable_infos = table_infos.cloned().collect_vec();
1044 if sstable_infos.len() > 1 {
1045 factory.add_concat_sst_iter(
1046 sstable_infos,
1047 self.sstable_store.clone(),
1048 sst_read_options.clone(),
1049 );
1050 local_stats.non_overlapping_iter_count += 1;
1051 } else {
1052 let sstable = self
1053 .sstable_store
1054 .sstable(&sstable_infos[0], local_stats)
1055 .await?;
1056
1057 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1058 && !hit_sstable_bloom_filter(
1059 &sstable,
1060 &user_key_range_ref,
1061 *dist_hash,
1062 local_stats,
1063 )
1064 {
1065 continue;
1066 }
1067 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1073 sstable,
1074 self.sstable_store.clone(),
1075 sst_read_options.clone(),
1076 &sstable_infos[0],
1077 ));
1078 local_stats.non_overlapping_iter_count += 1;
1079 }
1080 } else {
1081 let table_infos =
1082 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1083 let fetch_meta_req = table_infos.rev().collect_vec();
1085 if fetch_meta_req.is_empty() {
1086 continue;
1087 }
1088 for sstable_info in fetch_meta_req {
1089 let sstable = self
1090 .sstable_store
1091 .sstable(sstable_info, local_stats)
1092 .await?;
1093 assert_eq!(sstable_info.object_id, sstable.id);
1094 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1095 && !hit_sstable_bloom_filter(
1096 &sstable,
1097 &user_key_range_ref,
1098 *dist_hash,
1099 local_stats,
1100 )
1101 {
1102 continue;
1103 }
1104 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1105 sstable,
1106 self.sstable_store.clone(),
1107 sst_read_options.clone(),
1108 sstable_info,
1109 ));
1110 local_stats.overlapping_iter_count += 1;
1111 }
1112 }
1113 }
1114 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1115 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1116 let table_id_string = table_id.to_string();
1117 tracing::warn!(
1118 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1119 table_id_string,
1120 epoch,
1121 fetch_meta_duration_sec,
1122 local_stats.cache_meta_block_miss
1123 );
1124 self.state_store_metrics
1125 .iter_slow_fetch_meta_cache_unhits
1126 .set(local_stats.cache_meta_block_miss as i64);
1127 }
1128 Ok(())
1129 }
1130
1131 pub async fn iter_log(
1132 &self,
1133 version: PinnedVersion,
1134 epoch_range: (u64, u64),
1135 key_range: TableKeyRange,
1136 options: ReadLogOptions,
1137 ) -> HummockResult<ChangeLogIterator> {
1138 let change_log = {
1139 let table_change_logs = version.table_change_log_read_lock();
1140 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1141 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1142 } else {
1143 Vec::new()
1144 }
1145 };
1146
1147 if let Some(max_epoch_change_log) = change_log.last() {
1148 let (_, max_epoch) = epoch_range;
1149 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1150 warn!(
1151 max_epoch,
1152 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1153 table_id = %options.table_id,
1154 "max_epoch does not exist"
1155 );
1156 }
1157 }
1158 let read_options = Arc::new(SstableIteratorReadOptions {
1159 cache_policy: Default::default(),
1160 must_iterated_end_user_key: None,
1161 max_preload_retry_times: 0,
1162 prefetch_for_large_query: false,
1163 });
1164
1165 async fn make_iter(
1166 sstable_infos: impl Iterator<Item = &SstableInfo>,
1167 sstable_store: &SstableStoreRef,
1168 read_options: Arc<SstableIteratorReadOptions>,
1169 local_stat: &mut StoreLocalStatistic,
1170 ) -> HummockResult<MergeIterator<SstableIterator>> {
1171 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1172 let sstable_store = sstable_store.clone();
1173 let read_options = read_options.clone();
1174 async move {
1175 let mut local_stat = StoreLocalStatistic::default();
1176 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1177 Ok::<_, HummockError>((
1178 SstableIterator::new(
1179 table_holder,
1180 sstable_store,
1181 read_options,
1182 sstable_info,
1183 ),
1184 local_stat,
1185 ))
1186 }
1187 }))
1188 .await?;
1189 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1190 |(iter, stats)| {
1191 local_stat.add(&stats);
1192 iter
1193 },
1194 )))
1195 }
1196
1197 let mut local_stat = StoreLocalStatistic::default();
1198
1199 let new_value_iter = make_iter(
1200 change_log
1201 .iter()
1202 .flat_map(|log| log.new_value.iter())
1203 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1204 &self.sstable_store,
1205 read_options.clone(),
1206 &mut local_stat,
1207 )
1208 .await?;
1209 let old_value_iter = make_iter(
1210 change_log
1211 .iter()
1212 .flat_map(|log| log.old_value.iter())
1213 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1214 &self.sstable_store,
1215 read_options.clone(),
1216 &mut local_stat,
1217 )
1218 .await?;
1219 ChangeLogIterator::new(
1220 epoch_range,
1221 key_range,
1222 new_value_iter,
1223 old_value_iter,
1224 options.table_id,
1225 IterLocalMetricsGuard::new(
1226 self.state_store_metrics.clone(),
1227 options.table_id,
1228 local_stat,
1229 ),
1230 )
1231 .await
1232 }
1233
1234 pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1235 &'a self,
1236 version: PinnedVersion,
1237 table_id: TableId,
1238 target: VectorRef<'a>,
1239 options: VectorNearestOptions,
1240 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1241 ) -> HummockResult<Vec<O>> {
1242 let Some(index) = version.vector_indexes.get(&table_id) else {
1243 return Ok(vec![]);
1244 };
1245 if target.dimension() != index.dimension {
1246 return Err(HummockError::other(format!(
1247 "target dimension {} not match index dimension {}",
1248 target.dimension(),
1249 index.dimension
1250 )));
1251 }
1252 match &index.inner {
1253 VectorIndexImpl::Flat(flat) => {
1254 let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1255 let mut cache_stat = VectorStoreCacheStats::default();
1256 for vector_file in &flat.vector_store_info.vector_files {
1257 let meta = self
1258 .sstable_store
1259 .get_vector_file_meta(vector_file, &mut cache_stat)
1260 .await?;
1261 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1262 let block = self
1263 .sstable_store
1264 .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1265 .await?;
1266 builder.add(&**block, &on_nearest_item_fn);
1267 }
1268 }
1269 cache_stat.report(table_id, "flat", self.stats());
1270 Ok(builder.finish())
1271 }
1272 VectorIndexImpl::HnswFlat(hnsw_flat) => {
1273 let Some(graph_file) = &hnsw_flat.graph_file else {
1274 return Ok(vec![]);
1275 };
1276
1277 let mut ctx = FileVectorStoreCtx::default();
1278
1279 let graph = self
1280 .sstable_store
1281 .get_hnsw_graph(graph_file, &mut ctx.stats)
1282 .await?;
1283
1284 let vector_store =
1285 FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1286 let (items, stats) = nearest::<O, M, _>(
1287 &vector_store,
1288 &mut ctx,
1289 &*graph,
1290 target,
1291 on_nearest_item_fn,
1292 options.hnsw_ef_search,
1293 options.top_n,
1294 )
1295 .await?;
1296 ctx.stats.report(table_id, "hnsw_read", self.stats());
1297 report_hnsw_stat(
1298 self.stats(),
1299 table_id,
1300 "hnsw_read",
1301 options.top_n,
1302 options.hnsw_ef_search,
1303 [stats],
1304 );
1305 Ok(items)
1306 }
1307 }
1308 }
1309}