1use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54 filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55 search_sst_idx,
56};
57use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
58use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
59use crate::hummock::{
60 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
61 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
62 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
63 hit_sstable_bloom_filter,
64};
65use crate::mem_table::{
66 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
67};
68use crate::monitor::{
69 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
70};
71use crate::store::{
72 OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
73};
74use crate::vector::hnsw::nearest;
75use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
76
77pub type CommittedVersion = PinnedVersion;
78
79#[derive(Clone, Debug, PartialEq)]
85pub struct StagingSstableInfo {
86 sstable_infos: Vec<LocalSstableInfo>,
88 old_value_sstable_infos: Vec<LocalSstableInfo>,
89 epochs: Vec<HummockEpoch>,
92 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
94 imm_size: usize,
95}
96
97impl StagingSstableInfo {
98 pub fn new(
99 sstable_infos: Vec<LocalSstableInfo>,
100 old_value_sstable_infos: Vec<LocalSstableInfo>,
101 epochs: Vec<HummockEpoch>,
102 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
103 imm_size: usize,
104 ) -> Self {
105 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
107 Self {
108 sstable_infos,
109 old_value_sstable_infos,
110 epochs,
111 imm_ids,
112 imm_size,
113 }
114 }
115
116 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
117 &self.sstable_infos
118 }
119
120 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
121 &self.old_value_sstable_infos
122 }
123
124 pub fn imm_size(&self) -> usize {
125 self.imm_size
126 }
127
128 pub fn epochs(&self) -> &Vec<HummockEpoch> {
129 &self.epochs
130 }
131
132 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
133 &self.imm_ids
134 }
135}
136
137pub enum VersionUpdate {
138 Sst(Arc<StagingSstableInfo>),
139 CommittedSnapshot(CommittedVersion),
140 NewTableWatermark {
141 direction: WatermarkDirection,
142 epoch: HummockEpoch,
143 vnode_watermarks: Vec<VnodeWatermark>,
144 watermark_type: WatermarkSerdeType,
145 },
146}
147
148#[derive(Clone)]
149pub struct StagingVersion {
150 pending_imm_size: usize,
151 pub pending_imms: Vec<ImmutableMemtable>,
158 pub uploading_imms: VecDeque<ImmutableMemtable>,
164
165 pub sst: VecDeque<Arc<StagingSstableInfo>>,
167}
168
169impl StagingVersion {
170 pub fn prune_overlap<'a>(
173 &'a self,
174 max_epoch_inclusive: HummockEpoch,
175 table_id: TableId,
176 table_key_range: &'a TableKeyRange,
177 ) -> (
178 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
179 impl Iterator<Item = &'a SstableInfo> + 'a,
180 ) {
181 let (left, right) = table_key_range;
182 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
183 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
184 let overlapped_imms = self
185 .pending_imms
186 .iter()
187 .rev() .chain(self.uploading_imms.iter())
189 .filter(move |imm| {
190 imm.min_epoch() <= max_epoch_inclusive
192 && imm.table_id == table_id
193 && range_overlap(
194 &(left, right),
195 &imm.start_table_key(),
196 Bound::Included(&imm.end_table_key()),
197 )
198 });
199
200 let overlapped_ssts = self
202 .sst
203 .iter()
204 .filter(move |staging_sst| {
205 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
206 sst_max_epoch <= max_epoch_inclusive
207 })
208 .flat_map(move |staging_sst| {
209 staging_sst
212 .sstable_infos
213 .iter()
214 .map(|sstable| &sstable.sst_info)
215 .filter(move |sstable: &&SstableInfo| {
216 filter_single_sst(sstable, table_id, table_key_range)
217 })
218 });
219 (overlapped_imms, overlapped_ssts)
220 }
221
222 pub fn is_empty(&self) -> bool {
223 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
224 }
225}
226
227#[derive(Clone)]
228pub struct HummockReadVersion {
230 table_id: TableId,
231 instance_id: LocalInstanceId,
232
233 is_initialized: bool,
234
235 staging: StagingVersion,
237
238 committed: CommittedVersion,
240
241 is_replicated: bool,
246
247 table_watermarks: Option<PkPrefixTableWatermarksIndex>,
248
249 vnodes: Arc<Bitmap>,
252}
253
254impl HummockReadVersion {
255 pub fn new_with_replication_option(
256 table_id: TableId,
257 instance_id: LocalInstanceId,
258 committed_version: CommittedVersion,
259 is_replicated: bool,
260 vnodes: Arc<Bitmap>,
261 ) -> Self {
262 assert!(committed_version.is_valid());
266 Self {
267 table_id,
268 instance_id,
269 table_watermarks: {
270 match committed_version.table_watermarks.get(&table_id) {
271 Some(table_watermarks) => match table_watermarks.watermark_type {
272 WatermarkSerdeType::PkPrefix => {
273 Some(PkPrefixTableWatermarksIndex::new_committed(
274 table_watermarks.clone(),
275 committed_version
276 .state_table_info
277 .info()
278 .get(&table_id)
279 .expect("should exist")
280 .committed_epoch,
281 ))
282 }
283 WatermarkSerdeType::NonPkPrefix => None, WatermarkSerdeType::Value => None,
285 },
286 None => None,
287 }
288 },
289 staging: StagingVersion {
290 pending_imm_size: 0,
291 pending_imms: Vec::default(),
292 uploading_imms: VecDeque::default(),
293 sst: VecDeque::default(),
294 },
295
296 committed: committed_version,
297
298 is_replicated,
299 vnodes,
300 is_initialized: false,
301 }
302 }
303
304 pub fn new(
305 table_id: TableId,
306 instance_id: LocalInstanceId,
307 committed_version: CommittedVersion,
308 vnodes: Arc<Bitmap>,
309 ) -> Self {
310 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
311 }
312
313 pub fn table_id(&self) -> TableId {
314 self.table_id
315 }
316
317 pub fn init(&mut self) {
318 assert!(!self.is_initialized);
319 self.is_initialized = true;
320 }
321
322 pub fn add_imm(&mut self, imm: ImmutableMemtable) {
323 assert!(self.is_initialized);
324 if let Some(item) = self
325 .staging
326 .pending_imms
327 .last()
328 .or_else(|| self.staging.uploading_imms.front())
329 {
330 debug_assert!(item.batch_id() < imm.batch_id());
332 }
333
334 self.staging.pending_imm_size += imm.size();
335 self.staging.pending_imms.push(imm);
336 }
337
338 pub fn pending_imm_size(&self) -> usize {
339 self.staging.pending_imm_size
340 }
341
342 pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
343 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
344 for imm in &pending_imms {
345 self.staging.uploading_imms.push_front(imm.clone());
346 }
347 self.staging.pending_imm_size = 0;
348 pending_imms
349 }
350
351 pub fn update(&mut self, info: VersionUpdate) {
361 match info {
362 VersionUpdate::Sst(staging_sst_ref) => {
363 {
364 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
365 warn!(
366 instance_id = self.instance_id,
367 "no related imm in sst input"
368 );
369 return;
370 };
371
372 for imm_id in imms.iter().rev() {
374 let check_err = match self.staging.uploading_imms.pop_back() {
375 None => Some("empty".to_owned()),
376 Some(prev_imm_id) => {
377 if prev_imm_id.batch_id() == *imm_id {
378 None
379 } else {
380 Some(format!(
381 "miss match id {} {}",
382 prev_imm_id.batch_id(),
383 *imm_id
384 ))
385 }
386 }
387 };
388 assert!(
389 check_err.is_none(),
390 "should be valid staging_sst.size {},
391 staging_sst.imm_ids {:?},
392 staging_sst.epochs {:?},
393 local_pending_imm_ids {:?},
394 local_uploading_imm_ids {:?},
395 instance_id {}
396 check_err {:?}",
397 staging_sst_ref.imm_size,
398 staging_sst_ref.imm_ids,
399 staging_sst_ref.epochs,
400 self.staging
401 .pending_imms
402 .iter()
403 .map(|imm| imm.batch_id())
404 .collect_vec(),
405 self.staging
406 .uploading_imms
407 .iter()
408 .map(|imm| imm.batch_id())
409 .collect_vec(),
410 self.instance_id,
411 check_err
412 );
413 }
414
415 self.staging.sst.push_front(staging_sst_ref);
416 }
417 }
418
419 VersionUpdate::CommittedSnapshot(committed_version) => {
420 if let Some(info) = committed_version
421 .state_table_info
422 .info()
423 .get(&self.table_id)
424 {
425 let committed_epoch = info.committed_epoch;
426 if self.is_replicated {
427 self.staging
428 .uploading_imms
429 .retain(|imm| imm.min_epoch() > committed_epoch);
430 self.staging
431 .pending_imms
432 .retain(|imm| imm.min_epoch() > committed_epoch);
433 } else {
434 self.staging
435 .pending_imms
436 .iter()
437 .chain(self.staging.uploading_imms.iter())
438 .for_each(|imm| {
439 assert!(
440 imm.min_epoch() > committed_epoch,
441 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
442 imm.table_id,
443 imm.min_epoch(),
444 committed_epoch
445 )
446 });
447 }
448
449 self.staging.sst.retain(|sst| {
450 sst.epochs.first().expect("epochs not empty") > &committed_epoch
451 });
452
453 assert!(self.staging.sst.iter().all(|sst| {
455 sst.epochs.last().expect("epochs not empty") > &committed_epoch
456 }));
457
458 if let Some(committed_watermarks) =
459 committed_version.table_watermarks.get(&self.table_id)
460 && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
461 {
462 if let Some(watermark_index) = &mut self.table_watermarks {
463 watermark_index.apply_committed_watermarks(
464 committed_watermarks.clone(),
465 committed_epoch,
466 );
467 } else {
468 self.table_watermarks =
469 Some(PkPrefixTableWatermarksIndex::new_committed(
470 committed_watermarks.clone(),
471 committed_epoch,
472 ));
473 }
474 }
475 }
476
477 self.committed = committed_version;
478 }
479 VersionUpdate::NewTableWatermark {
480 direction,
481 epoch,
482 vnode_watermarks,
483 watermark_type,
484 } => {
485 assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
486 if let Some(watermark_index) = &mut self.table_watermarks {
487 watermark_index.add_epoch_watermark(
488 epoch,
489 Arc::from(vnode_watermarks),
490 direction,
491 );
492 } else {
493 self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
494 direction,
495 epoch,
496 vnode_watermarks,
497 self.committed.table_committed_epoch(self.table_id),
498 ));
499 }
500 }
501 }
502 }
503
504 pub fn staging(&self) -> &StagingVersion {
505 &self.staging
506 }
507
508 pub fn committed(&self) -> &CommittedVersion {
509 &self.committed
510 }
511
512 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
517 if let Some(watermark_index) = &self.table_watermarks {
518 watermark_index.filter_regress_watermarks(watermarks)
519 }
520 }
521
522 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
523 self.table_watermarks
524 .as_ref()
525 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
526 }
527
528 pub fn is_initialized(&self) -> bool {
529 self.is_initialized
530 }
531
532 pub fn is_replicated(&self) -> bool {
533 self.is_replicated
534 }
535
536 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
537 std::mem::replace(&mut self.vnodes, vnodes)
538 }
539
540 pub fn contains(&self, vnode: VirtualNode) -> bool {
541 self.vnodes.is_set(vnode.to_index())
542 }
543
544 pub fn vnodes(&self) -> Arc<Bitmap> {
545 self.vnodes.clone()
546 }
547}
548
549pub fn read_filter_for_version(
550 epoch: HummockEpoch,
551 table_id: TableId,
552 mut table_key_range: TableKeyRange,
553 read_version: &RwLock<HummockReadVersion>,
554) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
555 let read_version_guard = read_version.read();
556
557 let committed_version = read_version_guard.committed().clone();
558
559 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
560 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
561 }
562
563 let (imm_iter, sst_iter) =
564 read_version_guard
565 .staging()
566 .prune_overlap(epoch, table_id, &table_key_range);
567
568 let imms = imm_iter.cloned().collect();
569 let ssts = sst_iter.cloned().collect();
570
571 Ok((table_key_range, (imms, ssts, committed_version)))
572}
573
574#[derive(Clone)]
575pub struct HummockVersionReader {
576 sstable_store: SstableStoreRef,
577
578 state_store_metrics: Arc<HummockStateStoreMetrics>,
580 preload_retry_times: usize,
581}
582
583impl HummockVersionReader {
586 pub fn new(
587 sstable_store: SstableStoreRef,
588 state_store_metrics: Arc<HummockStateStoreMetrics>,
589 preload_retry_times: usize,
590 ) -> Self {
591 Self {
592 sstable_store,
593 state_store_metrics,
594 preload_retry_times,
595 }
596 }
597
598 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
599 &self.state_store_metrics
600 }
601}
602
603const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
604
605impl HummockVersionReader {
606 pub async fn get<'a, O>(
607 &'a self,
608 table_key: TableKey<Bytes>,
609 epoch: u64,
610 table_id: TableId,
611 read_options: ReadOptions,
612 read_version_tuple: ReadVersionTuple,
613 on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
614 ) -> StorageResult<Option<O>> {
615 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
616
617 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
618 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
619 let local_stats = &mut stats_guard.local_stats;
620 local_stats.found_key = true;
621
622 for imm in &imms {
624 if imm.max_epoch() < min_epoch {
626 continue;
627 }
628
629 local_stats.staging_imm_get_count += 1;
630
631 if let Some((data, data_epoch)) = get_from_batch(
632 imm,
633 TableKey(table_key.as_ref()),
634 epoch,
635 &read_options,
636 local_stats,
637 ) {
638 return Ok(if data_epoch.pure_epoch() < min_epoch {
639 None
640 } else {
641 data.into_user_value()
642 .map(|v| {
643 on_key_value_fn(
644 FullKey::new_with_gap_epoch(
645 table_id,
646 table_key.to_ref(),
647 data_epoch,
648 ),
649 v.as_ref(),
650 )
651 })
652 .transpose()?
653 });
654 }
655 }
656
657 let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
659 Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
660 });
661
662 let full_key = FullKey::new_with_gap_epoch(
665 table_id,
666 TableKey(table_key.clone()),
667 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
668 );
669 let single_table_key_range = table_key.clone()..=table_key.clone();
670
671 let pruned_uncommitted_ssts =
673 prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
674 for local_sst in pruned_uncommitted_ssts {
675 local_stats.staging_sst_get_count += 1;
676 if let Some(iter) = get_from_sstable_info(
677 self.sstable_store.clone(),
678 local_sst,
679 full_key.to_ref(),
680 &read_options,
681 dist_key_hash,
682 local_stats,
683 )
684 .await?
685 {
686 debug_assert!(iter.is_valid());
687 let data_epoch = iter.key().epoch_with_gap;
688 return Ok(if data_epoch.pure_epoch() < min_epoch {
689 None
690 } else {
691 iter.value()
692 .into_user_value()
693 .map(|v| {
694 on_key_value_fn(
695 FullKey::new_with_gap_epoch(
696 table_id,
697 table_key.to_ref(),
698 data_epoch,
699 ),
700 v,
701 )
702 })
703 .transpose()?
704 });
705 }
706 }
707 assert!(committed_version.is_valid());
711 for level in committed_version.levels(table_id) {
712 if level.table_infos.is_empty() {
713 continue;
714 }
715
716 match level.level_type {
717 LevelType::Overlapping | LevelType::Unspecified => {
718 let sstable_infos = prune_overlapping_ssts(
719 &level.table_infos,
720 table_id,
721 &single_table_key_range,
722 );
723 for sstable_info in sstable_infos {
724 local_stats.overlapping_get_count += 1;
725 if let Some(iter) = get_from_sstable_info(
726 self.sstable_store.clone(),
727 sstable_info,
728 full_key.to_ref(),
729 &read_options,
730 dist_key_hash,
731 local_stats,
732 )
733 .await?
734 {
735 debug_assert!(iter.is_valid());
736 let data_epoch = iter.key().epoch_with_gap;
737 return Ok(if data_epoch.pure_epoch() < min_epoch {
738 None
739 } else {
740 iter.value()
741 .into_user_value()
742 .map(|v| {
743 on_key_value_fn(
744 FullKey::new_with_gap_epoch(
745 table_id,
746 table_key.to_ref(),
747 data_epoch,
748 ),
749 v,
750 )
751 })
752 .transpose()?
753 });
754 }
755 }
756 }
757 LevelType::Nonoverlapping => {
758 let mut table_info_idx =
759 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
760 if table_info_idx == 0 {
761 continue;
762 }
763 table_info_idx = table_info_idx.saturating_sub(1);
764 let ord = level.table_infos[table_info_idx]
765 .key_range
766 .compare_right_with_user_key(full_key.user_key.as_ref());
767 if ord == Ordering::Less {
769 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
770 continue;
771 }
772
773 local_stats.non_overlapping_get_count += 1;
774 if let Some(iter) = get_from_sstable_info(
775 self.sstable_store.clone(),
776 &level.table_infos[table_info_idx],
777 full_key.to_ref(),
778 &read_options,
779 dist_key_hash,
780 local_stats,
781 )
782 .await?
783 {
784 debug_assert!(iter.is_valid());
785 let data_epoch = iter.key().epoch_with_gap;
786 return Ok(if data_epoch.pure_epoch() < min_epoch {
787 None
788 } else {
789 iter.value()
790 .into_user_value()
791 .map(|v| {
792 on_key_value_fn(
793 FullKey::new_with_gap_epoch(
794 table_id,
795 table_key.to_ref(),
796 data_epoch,
797 ),
798 v,
799 )
800 })
801 .transpose()?
802 });
803 }
804 }
805 }
806 }
807 stats_guard.local_stats.found_key = false;
808 Ok(None)
809 }
810
811 pub async fn iter(
812 &self,
813 table_key_range: TableKeyRange,
814 epoch: u64,
815 table_id: TableId,
816 read_options: ReadOptions,
817 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
818 ) -> StorageResult<HummockStorageIterator> {
819 self.iter_with_memtable(
820 table_key_range,
821 epoch,
822 table_id,
823 read_options,
824 read_version_tuple,
825 None,
826 )
827 .await
828 }
829
830 pub async fn iter_with_memtable<'b>(
831 &self,
832 table_key_range: TableKeyRange,
833 epoch: u64,
834 table_id: TableId,
835 read_options: ReadOptions,
836 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
837 memtable_iter: Option<MemTableHummockIterator<'b>>,
838 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
839 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
840 let user_key_range = (
841 user_key_range_ref.0.map(|key| key.cloned()),
842 user_key_range_ref.1.map(|key| key.cloned()),
843 );
844 let mut factory = ForwardIteratorFactory::default();
845 let mut local_stats = StoreLocalStatistic::default();
846 let (imms, uncommitted_ssts, committed) = read_version_tuple;
847 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
848 self.iter_inner(
849 table_key_range,
850 epoch,
851 table_id,
852 read_options,
853 imms,
854 uncommitted_ssts,
855 &committed,
856 &mut local_stats,
857 &mut factory,
858 )
859 .await?;
860 let merge_iter = factory.build(memtable_iter);
861 let mut user_iter = UserIterator::new(
863 merge_iter,
864 user_key_range,
865 epoch,
866 min_epoch,
867 Some(committed),
868 );
869 user_iter.rewind().await?;
870 Ok(HummockStorageIteratorInner::new(
871 user_iter,
872 self.state_store_metrics.clone(),
873 table_id,
874 local_stats,
875 ))
876 }
877
878 pub async fn rev_iter<'b>(
879 &self,
880 table_key_range: TableKeyRange,
881 epoch: u64,
882 table_id: TableId,
883 read_options: ReadOptions,
884 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
885 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
886 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
887 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
888 let user_key_range = (
889 user_key_range_ref.0.map(|key| key.cloned()),
890 user_key_range_ref.1.map(|key| key.cloned()),
891 );
892 let mut factory = BackwardIteratorFactory::default();
893 let mut local_stats = StoreLocalStatistic::default();
894 let (imms, uncommitted_ssts, committed) = read_version_tuple;
895 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
896 self.iter_inner(
897 table_key_range,
898 epoch,
899 table_id,
900 read_options,
901 imms,
902 uncommitted_ssts,
903 &committed,
904 &mut local_stats,
905 &mut factory,
906 )
907 .await?;
908 let merge_iter = factory.build(memtable_iter);
909 let mut user_iter = BackwardUserIterator::new(
911 merge_iter,
912 user_key_range,
913 epoch,
914 min_epoch,
915 Some(committed),
916 );
917 user_iter.rewind().await?;
918 Ok(HummockStorageRevIteratorInner::new(
919 user_iter,
920 self.state_store_metrics.clone(),
921 table_id,
922 local_stats,
923 ))
924 }
925
926 async fn iter_inner<F: IteratorFactory>(
927 &self,
928 table_key_range: TableKeyRange,
929 epoch: u64,
930 table_id: TableId,
931 read_options: ReadOptions,
932 imms: Vec<ImmutableMemtable>,
933 uncommitted_ssts: Vec<SstableInfo>,
934 committed: &CommittedVersion,
935 local_stats: &mut StoreLocalStatistic,
936 factory: &mut F,
937 ) -> StorageResult<()> {
938 {
939 fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
940 match bound {
941 Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
942 Bound::Unbounded => None,
943 }
944 }
945 let (left, right) = &table_key_range;
946 if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
947 && right < left
948 {
949 if cfg!(debug_assertions) {
950 panic!("invalid iter key range: {table_id} {left:?} {right:?}")
951 } else {
952 return Err(HummockError::other(format!(
953 "invalid iter key range: {table_id} {left:?} {right:?}"
954 ))
955 .into());
956 }
957 }
958 }
959
960 local_stats.staging_imm_iter_count = imms.len() as u64;
961 for imm in imms {
962 factory.add_batch_iter(imm);
963 }
964
965 let user_key_range = bound_table_key_range(table_id, &table_key_range);
969 let user_key_range_ref = (
970 user_key_range.0.as_ref().map(UserKey::as_ref),
971 user_key_range.1.as_ref().map(UserKey::as_ref),
972 );
973 let mut staging_sst_iter_count = 0;
974 let bloom_filter_prefix_hash = read_options
976 .prefix_hint
977 .as_ref()
978 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
979 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
980 if read_options.prefetch_options.prefetch {
981 sst_read_options.must_iterated_end_user_key =
982 Some(user_key_range.1.map(|key| key.cloned()));
983 sst_read_options.max_preload_retry_times = self.preload_retry_times;
984 }
985 let sst_read_options = Arc::new(sst_read_options);
986 for sstable_info in &uncommitted_ssts {
987 let table_holder = self
988 .sstable_store
989 .sstable(sstable_info, local_stats)
990 .await?;
991
992 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
993 && !hit_sstable_bloom_filter(
994 &table_holder,
995 &user_key_range_ref,
996 *prefix_hash,
997 local_stats,
998 )
999 {
1000 continue;
1001 }
1002
1003 staging_sst_iter_count += 1;
1004 factory.add_staging_sst_iter(F::SstableIteratorType::create(
1005 table_holder,
1006 self.sstable_store.clone(),
1007 sst_read_options.clone(),
1008 sstable_info,
1009 ));
1010 }
1011 local_stats.staging_sst_iter_count = staging_sst_iter_count;
1012
1013 let timer = Instant::now();
1014
1015 for level in committed.levels(table_id) {
1016 if level.table_infos.is_empty() {
1017 continue;
1018 }
1019
1020 if level.level_type == LevelType::Nonoverlapping {
1021 let mut table_infos =
1022 prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1023 .peekable();
1024
1025 if table_infos.peek().is_none() {
1026 continue;
1027 }
1028 let sstable_infos = table_infos.cloned().collect_vec();
1029 if sstable_infos.len() > 1 {
1030 factory.add_concat_sst_iter(
1031 sstable_infos,
1032 self.sstable_store.clone(),
1033 sst_read_options.clone(),
1034 );
1035 local_stats.non_overlapping_iter_count += 1;
1036 } else {
1037 let sstable = self
1038 .sstable_store
1039 .sstable(&sstable_infos[0], local_stats)
1040 .await?;
1041
1042 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1043 && !hit_sstable_bloom_filter(
1044 &sstable,
1045 &user_key_range_ref,
1046 *dist_hash,
1047 local_stats,
1048 )
1049 {
1050 continue;
1051 }
1052 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1058 sstable,
1059 self.sstable_store.clone(),
1060 sst_read_options.clone(),
1061 &sstable_infos[0],
1062 ));
1063 local_stats.non_overlapping_iter_count += 1;
1064 }
1065 } else {
1066 let table_infos =
1067 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1068 let fetch_meta_req = table_infos.rev().collect_vec();
1070 if fetch_meta_req.is_empty() {
1071 continue;
1072 }
1073 for sstable_info in fetch_meta_req {
1074 let sstable = self
1075 .sstable_store
1076 .sstable(sstable_info, local_stats)
1077 .await?;
1078 assert_eq!(sstable_info.object_id, sstable.id);
1079 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1080 && !hit_sstable_bloom_filter(
1081 &sstable,
1082 &user_key_range_ref,
1083 *dist_hash,
1084 local_stats,
1085 )
1086 {
1087 continue;
1088 }
1089 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1090 sstable,
1091 self.sstable_store.clone(),
1092 sst_read_options.clone(),
1093 sstable_info,
1094 ));
1095 local_stats.overlapping_iter_count += 1;
1096 }
1097 }
1098 }
1099 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1100 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1101 let table_id_string = table_id.to_string();
1102 tracing::warn!(
1103 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1104 table_id_string,
1105 epoch,
1106 fetch_meta_duration_sec,
1107 local_stats.cache_meta_block_miss
1108 );
1109 self.state_store_metrics
1110 .iter_slow_fetch_meta_cache_unhits
1111 .set(local_stats.cache_meta_block_miss as i64);
1112 }
1113 Ok(())
1114 }
1115
1116 pub async fn iter_log(
1117 &self,
1118 version: PinnedVersion,
1119 epoch_range: (u64, u64),
1120 key_range: TableKeyRange,
1121 options: ReadLogOptions,
1122 ) -> HummockResult<ChangeLogIterator> {
1123 let change_log = {
1124 let table_change_logs = version.table_change_log_read_lock();
1125 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1126 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1127 } else {
1128 Vec::new()
1129 }
1130 };
1131
1132 if let Some(max_epoch_change_log) = change_log.last() {
1133 let (_, max_epoch) = epoch_range;
1134 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1135 warn!(
1136 max_epoch,
1137 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1138 table_id = %options.table_id,
1139 "max_epoch does not exist"
1140 );
1141 }
1142 }
1143 let read_options = Arc::new(SstableIteratorReadOptions {
1144 cache_policy: Default::default(),
1145 must_iterated_end_user_key: None,
1146 max_preload_retry_times: 0,
1147 prefetch_for_large_query: false,
1148 });
1149
1150 async fn make_iter(
1151 sstable_infos: impl Iterator<Item = &SstableInfo>,
1152 sstable_store: &SstableStoreRef,
1153 read_options: Arc<SstableIteratorReadOptions>,
1154 local_stat: &mut StoreLocalStatistic,
1155 ) -> HummockResult<MergeIterator<SstableIterator>> {
1156 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1157 let sstable_store = sstable_store.clone();
1158 let read_options = read_options.clone();
1159 async move {
1160 let mut local_stat = StoreLocalStatistic::default();
1161 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1162 Ok::<_, HummockError>((
1163 SstableIterator::new(
1164 table_holder,
1165 sstable_store,
1166 read_options,
1167 sstable_info,
1168 ),
1169 local_stat,
1170 ))
1171 }
1172 }))
1173 .await?;
1174 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1175 |(iter, stats)| {
1176 local_stat.add(&stats);
1177 iter
1178 },
1179 )))
1180 }
1181
1182 let mut local_stat = StoreLocalStatistic::default();
1183
1184 let new_value_iter = make_iter(
1185 change_log
1186 .iter()
1187 .flat_map(|log| log.new_value.iter())
1188 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1189 &self.sstable_store,
1190 read_options.clone(),
1191 &mut local_stat,
1192 )
1193 .await?;
1194 let old_value_iter = make_iter(
1195 change_log
1196 .iter()
1197 .flat_map(|log| log.old_value.iter())
1198 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1199 &self.sstable_store,
1200 read_options.clone(),
1201 &mut local_stat,
1202 )
1203 .await?;
1204 ChangeLogIterator::new(
1205 epoch_range,
1206 key_range,
1207 new_value_iter,
1208 old_value_iter,
1209 options.table_id,
1210 IterLocalMetricsGuard::new(
1211 self.state_store_metrics.clone(),
1212 options.table_id,
1213 local_stat,
1214 ),
1215 )
1216 .await
1217 }
1218
1219 pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1220 &'a self,
1221 version: PinnedVersion,
1222 table_id: TableId,
1223 target: VectorRef<'a>,
1224 options: VectorNearestOptions,
1225 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1226 ) -> HummockResult<Vec<O>> {
1227 let Some(index) = version.vector_indexes.get(&table_id) else {
1228 return Ok(vec![]);
1229 };
1230 if target.dimension() != index.dimension {
1231 return Err(HummockError::other(format!(
1232 "target dimension {} not match index dimension {}",
1233 target.dimension(),
1234 index.dimension
1235 )));
1236 }
1237 match &index.inner {
1238 VectorIndexImpl::Flat(flat) => {
1239 let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1240 let mut cache_stat = VectorStoreCacheStats::default();
1241 for vector_file in &flat.vector_store_info.vector_files {
1242 let meta = self
1243 .sstable_store
1244 .get_vector_file_meta(vector_file, &mut cache_stat)
1245 .await?;
1246 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1247 let block = self
1248 .sstable_store
1249 .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1250 .await?;
1251 builder.add(&**block, &on_nearest_item_fn);
1252 }
1253 }
1254 cache_stat.report(table_id, "flat", self.stats());
1255 Ok(builder.finish())
1256 }
1257 VectorIndexImpl::HnswFlat(hnsw_flat) => {
1258 let Some(graph_file) = &hnsw_flat.graph_file else {
1259 return Ok(vec![]);
1260 };
1261
1262 let mut ctx = FileVectorStoreCtx::default();
1263
1264 let graph = self
1265 .sstable_store
1266 .get_hnsw_graph(graph_file, &mut ctx.stats)
1267 .await?;
1268
1269 let vector_store =
1270 FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1271 let (items, stats) = nearest::<O, M, _>(
1272 &vector_store,
1273 &mut ctx,
1274 &*graph,
1275 target,
1276 on_nearest_item_fn,
1277 options.hnsw_ef_search,
1278 options.top_n,
1279 )
1280 .await?;
1281 ctx.stats.report(table_id, "hnsw_read", self.stats());
1282 report_hnsw_stat(
1283 self.stats(),
1284 table_id,
1285 "hnsw_read",
1286 options.top_n,
1287 options.hnsw_ef_search,
1288 [stats],
1289 );
1290 Ok(items)
1291 }
1292 }
1293 }
1294}