1use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54 filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55 search_sst_idx,
56};
57use crate::hummock::vector::file::FileVectorStore;
58use crate::hummock::{
59 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
60 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
61 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
62 hit_sstable_bloom_filter,
63};
64use crate::mem_table::{
65 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
66};
67use crate::monitor::{
68 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
69};
70use crate::store::{
71 OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
72};
73use crate::vector::hnsw::nearest;
74use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
75
76pub type CommittedVersion = PinnedVersion;
77
78#[derive(Clone, Debug, PartialEq)]
84pub struct StagingSstableInfo {
85 sstable_infos: Vec<LocalSstableInfo>,
87 old_value_sstable_infos: Vec<LocalSstableInfo>,
88 epochs: Vec<HummockEpoch>,
91 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
93 imm_size: usize,
94}
95
96impl StagingSstableInfo {
97 pub fn new(
98 sstable_infos: Vec<LocalSstableInfo>,
99 old_value_sstable_infos: Vec<LocalSstableInfo>,
100 epochs: Vec<HummockEpoch>,
101 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
102 imm_size: usize,
103 ) -> Self {
104 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
106 Self {
107 sstable_infos,
108 old_value_sstable_infos,
109 epochs,
110 imm_ids,
111 imm_size,
112 }
113 }
114
115 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
116 &self.sstable_infos
117 }
118
119 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
120 &self.old_value_sstable_infos
121 }
122
123 pub fn imm_size(&self) -> usize {
124 self.imm_size
125 }
126
127 pub fn epochs(&self) -> &Vec<HummockEpoch> {
128 &self.epochs
129 }
130
131 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
132 &self.imm_ids
133 }
134}
135
136pub enum VersionUpdate {
137 Sst(Arc<StagingSstableInfo>),
138 CommittedSnapshot(CommittedVersion),
139 NewTableWatermark {
140 direction: WatermarkDirection,
141 epoch: HummockEpoch,
142 vnode_watermarks: Vec<VnodeWatermark>,
143 watermark_type: WatermarkSerdeType,
144 },
145}
146
147#[derive(Clone)]
148pub struct StagingVersion {
149 pending_imm_size: usize,
150 pub pending_imms: Vec<ImmutableMemtable>,
157 pub uploading_imms: VecDeque<ImmutableMemtable>,
163
164 pub sst: VecDeque<Arc<StagingSstableInfo>>,
166}
167
168impl StagingVersion {
169 pub fn prune_overlap<'a>(
172 &'a self,
173 max_epoch_inclusive: HummockEpoch,
174 table_id: TableId,
175 table_key_range: &'a TableKeyRange,
176 ) -> (
177 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
178 impl Iterator<Item = &'a SstableInfo> + 'a,
179 ) {
180 let (left, right) = table_key_range;
181 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
182 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
183 let overlapped_imms = self
184 .pending_imms
185 .iter()
186 .rev() .chain(self.uploading_imms.iter())
188 .filter(move |imm| {
189 imm.min_epoch() <= max_epoch_inclusive
191 && imm.table_id == table_id
192 && range_overlap(
193 &(left, right),
194 &imm.start_table_key(),
195 Bound::Included(&imm.end_table_key()),
196 )
197 });
198
199 let overlapped_ssts = self
201 .sst
202 .iter()
203 .filter(move |staging_sst| {
204 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
205 sst_max_epoch <= max_epoch_inclusive
206 })
207 .flat_map(move |staging_sst| {
208 staging_sst
211 .sstable_infos
212 .iter()
213 .map(|sstable| &sstable.sst_info)
214 .filter(move |sstable: &&SstableInfo| {
215 filter_single_sst(sstable, table_id, table_key_range)
216 })
217 });
218 (overlapped_imms, overlapped_ssts)
219 }
220
221 pub fn is_empty(&self) -> bool {
222 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
223 }
224}
225
226#[derive(Clone)]
227pub struct HummockReadVersion {
229 table_id: TableId,
230 instance_id: LocalInstanceId,
231
232 staging: StagingVersion,
234
235 committed: CommittedVersion,
237
238 is_replicated: bool,
243
244 table_watermarks: Option<PkPrefixTableWatermarksIndex>,
245
246 vnodes: Arc<Bitmap>,
249}
250
251impl HummockReadVersion {
252 pub fn new_with_replication_option(
253 table_id: TableId,
254 instance_id: LocalInstanceId,
255 committed_version: CommittedVersion,
256 is_replicated: bool,
257 vnodes: Arc<Bitmap>,
258 ) -> Self {
259 assert!(committed_version.is_valid());
263 Self {
264 table_id,
265 instance_id,
266 table_watermarks: {
267 match committed_version.table_watermarks.get(&table_id) {
268 Some(table_watermarks) => match table_watermarks.watermark_type {
269 WatermarkSerdeType::PkPrefix => {
270 Some(PkPrefixTableWatermarksIndex::new_committed(
271 table_watermarks.clone(),
272 committed_version
273 .state_table_info
274 .info()
275 .get(&table_id)
276 .expect("should exist")
277 .committed_epoch,
278 ))
279 }
280
281 WatermarkSerdeType::NonPkPrefix => None, },
283 None => None,
284 }
285 },
286 staging: StagingVersion {
287 pending_imm_size: 0,
288 pending_imms: Vec::default(),
289 uploading_imms: VecDeque::default(),
290 sst: VecDeque::default(),
291 },
292
293 committed: committed_version,
294
295 is_replicated,
296 vnodes,
297 }
298 }
299
300 pub fn new(
301 table_id: TableId,
302 instance_id: LocalInstanceId,
303 committed_version: CommittedVersion,
304 vnodes: Arc<Bitmap>,
305 ) -> Self {
306 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
307 }
308
309 pub fn table_id(&self) -> TableId {
310 self.table_id
311 }
312
313 pub fn add_imm(&mut self, imm: ImmutableMemtable) {
314 if let Some(item) = self
315 .staging
316 .pending_imms
317 .last()
318 .or_else(|| self.staging.uploading_imms.front())
319 {
320 debug_assert!(item.batch_id() < imm.batch_id());
322 }
323
324 self.staging.pending_imm_size += imm.size();
325 self.staging.pending_imms.push(imm);
326 }
327
328 pub fn pending_imm_size(&self) -> usize {
329 self.staging.pending_imm_size
330 }
331
332 pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
333 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
334 for imm in &pending_imms {
335 self.staging.uploading_imms.push_front(imm.clone());
336 }
337 self.staging.pending_imm_size = 0;
338 pending_imms
339 }
340
341 pub fn update(&mut self, info: VersionUpdate) {
351 match info {
352 VersionUpdate::Sst(staging_sst_ref) => {
353 {
354 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
355 warn!(
356 instance_id = self.instance_id,
357 "no related imm in sst input"
358 );
359 return;
360 };
361
362 for imm_id in imms.iter().rev() {
364 let check_err = match self.staging.uploading_imms.pop_back() {
365 None => Some("empty".to_owned()),
366 Some(prev_imm_id) => {
367 if prev_imm_id.batch_id() == *imm_id {
368 None
369 } else {
370 Some(format!(
371 "miss match id {} {}",
372 prev_imm_id.batch_id(),
373 *imm_id
374 ))
375 }
376 }
377 };
378 assert!(
379 check_err.is_none(),
380 "should be valid staging_sst.size {},
381 staging_sst.imm_ids {:?},
382 staging_sst.epochs {:?},
383 local_pending_imm_ids {:?},
384 local_uploading_imm_ids {:?},
385 instance_id {}
386 check_err {:?}",
387 staging_sst_ref.imm_size,
388 staging_sst_ref.imm_ids,
389 staging_sst_ref.epochs,
390 self.staging
391 .pending_imms
392 .iter()
393 .map(|imm| imm.batch_id())
394 .collect_vec(),
395 self.staging
396 .uploading_imms
397 .iter()
398 .map(|imm| imm.batch_id())
399 .collect_vec(),
400 self.instance_id,
401 check_err
402 );
403 }
404
405 self.staging.sst.push_front(staging_sst_ref);
406 }
407 }
408
409 VersionUpdate::CommittedSnapshot(committed_version) => {
410 if let Some(info) = committed_version
411 .state_table_info
412 .info()
413 .get(&self.table_id)
414 {
415 let committed_epoch = info.committed_epoch;
416 if self.is_replicated {
417 self.staging
418 .uploading_imms
419 .retain(|imm| imm.min_epoch() > committed_epoch);
420 self.staging
421 .pending_imms
422 .retain(|imm| imm.min_epoch() > committed_epoch);
423 } else {
424 self.staging
425 .pending_imms
426 .iter()
427 .chain(self.staging.uploading_imms.iter())
428 .for_each(|imm| {
429 assert!(
430 imm.min_epoch() > committed_epoch,
431 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
432 imm.table_id,
433 imm.min_epoch(),
434 committed_epoch
435 )
436 });
437 }
438
439 self.staging.sst.retain(|sst| {
440 sst.epochs.first().expect("epochs not empty") > &committed_epoch
441 });
442
443 assert!(self.staging.sst.iter().all(|sst| {
445 sst.epochs.last().expect("epochs not empty") > &committed_epoch
446 }));
447
448 if let Some(committed_watermarks) =
449 committed_version.table_watermarks.get(&self.table_id)
450 && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
451 {
452 if let Some(watermark_index) = &mut self.table_watermarks {
453 watermark_index.apply_committed_watermarks(
454 committed_watermarks.clone(),
455 committed_epoch,
456 );
457 } else {
458 self.table_watermarks =
459 Some(PkPrefixTableWatermarksIndex::new_committed(
460 committed_watermarks.clone(),
461 committed_epoch,
462 ));
463 }
464 }
465 }
466
467 self.committed = committed_version;
468 }
469 VersionUpdate::NewTableWatermark {
470 direction,
471 epoch,
472 vnode_watermarks,
473 watermark_type,
474 } => {
475 assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
476 if let Some(watermark_index) = &mut self.table_watermarks {
477 watermark_index.add_epoch_watermark(
478 epoch,
479 Arc::from(vnode_watermarks),
480 direction,
481 );
482 } else {
483 self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
484 direction,
485 epoch,
486 vnode_watermarks,
487 self.committed.table_committed_epoch(self.table_id),
488 ));
489 }
490 }
491 }
492 }
493
494 pub fn staging(&self) -> &StagingVersion {
495 &self.staging
496 }
497
498 pub fn committed(&self) -> &CommittedVersion {
499 &self.committed
500 }
501
502 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
507 if let Some(watermark_index) = &self.table_watermarks {
508 watermark_index.filter_regress_watermarks(watermarks)
509 }
510 }
511
512 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
513 self.table_watermarks
514 .as_ref()
515 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
516 }
517
518 pub fn is_replicated(&self) -> bool {
519 self.is_replicated
520 }
521
522 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
523 std::mem::replace(&mut self.vnodes, vnodes)
524 }
525
526 pub fn contains(&self, vnode: VirtualNode) -> bool {
527 self.vnodes.is_set(vnode.to_index())
528 }
529
530 pub fn vnodes(&self) -> Arc<Bitmap> {
531 self.vnodes.clone()
532 }
533}
534
535pub fn read_filter_for_version(
536 epoch: HummockEpoch,
537 table_id: TableId,
538 mut table_key_range: TableKeyRange,
539 read_version: &RwLock<HummockReadVersion>,
540) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
541 let read_version_guard = read_version.read();
542
543 let committed_version = read_version_guard.committed().clone();
544
545 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
546 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
547 }
548
549 let (imm_iter, sst_iter) =
550 read_version_guard
551 .staging()
552 .prune_overlap(epoch, table_id, &table_key_range);
553
554 let imms = imm_iter.cloned().collect();
555 let ssts = sst_iter.cloned().collect();
556
557 Ok((table_key_range, (imms, ssts, committed_version)))
558}
559
560#[derive(Clone)]
561pub struct HummockVersionReader {
562 sstable_store: SstableStoreRef,
563
564 state_store_metrics: Arc<HummockStateStoreMetrics>,
566 preload_retry_times: usize,
567}
568
569impl HummockVersionReader {
572 pub fn new(
573 sstable_store: SstableStoreRef,
574 state_store_metrics: Arc<HummockStateStoreMetrics>,
575 preload_retry_times: usize,
576 ) -> Self {
577 Self {
578 sstable_store,
579 state_store_metrics,
580 preload_retry_times,
581 }
582 }
583
584 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
585 &self.state_store_metrics
586 }
587}
588
589const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
590
591impl HummockVersionReader {
592 pub async fn get<'a, O>(
593 &'a self,
594 table_key: TableKey<Bytes>,
595 epoch: u64,
596 table_id: TableId,
597 read_options: ReadOptions,
598 read_version_tuple: ReadVersionTuple,
599 on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
600 ) -> StorageResult<Option<O>> {
601 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
602
603 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
604 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
605 let local_stats = &mut stats_guard.local_stats;
606 local_stats.found_key = true;
607
608 for imm in &imms {
610 if imm.max_epoch() < min_epoch {
612 continue;
613 }
614
615 local_stats.staging_imm_get_count += 1;
616
617 if let Some((data, data_epoch)) = get_from_batch(
618 imm,
619 TableKey(table_key.as_ref()),
620 epoch,
621 &read_options,
622 local_stats,
623 ) {
624 return Ok(if data_epoch.pure_epoch() < min_epoch {
625 None
626 } else {
627 data.into_user_value()
628 .map(|v| {
629 on_key_value_fn(
630 FullKey::new_with_gap_epoch(
631 table_id,
632 table_key.to_ref(),
633 data_epoch,
634 ),
635 v.as_ref(),
636 )
637 })
638 .transpose()?
639 });
640 }
641 }
642
643 let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
645 Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
646 });
647
648 let full_key = FullKey::new_with_gap_epoch(
651 table_id,
652 TableKey(table_key.clone()),
653 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
654 );
655 for local_sst in &uncommitted_ssts {
656 local_stats.staging_sst_get_count += 1;
657 if let Some(iter) = get_from_sstable_info(
658 self.sstable_store.clone(),
659 local_sst,
660 full_key.to_ref(),
661 &read_options,
662 dist_key_hash,
663 local_stats,
664 )
665 .await?
666 {
667 debug_assert!(iter.is_valid());
668 let data_epoch = iter.key().epoch_with_gap;
669 return Ok(if data_epoch.pure_epoch() < min_epoch {
670 None
671 } else {
672 iter.value()
673 .into_user_value()
674 .map(|v| {
675 on_key_value_fn(
676 FullKey::new_with_gap_epoch(
677 table_id,
678 table_key.to_ref(),
679 data_epoch,
680 ),
681 v,
682 )
683 })
684 .transpose()?
685 });
686 }
687 }
688 let single_table_key_range = table_key.clone()..=table_key.clone();
689 assert!(committed_version.is_valid());
693 for level in committed_version.levels(table_id) {
694 if level.table_infos.is_empty() {
695 continue;
696 }
697
698 match level.level_type {
699 LevelType::Overlapping | LevelType::Unspecified => {
700 let sstable_infos = prune_overlapping_ssts(
701 &level.table_infos,
702 table_id,
703 &single_table_key_range,
704 );
705 for sstable_info in sstable_infos {
706 local_stats.overlapping_get_count += 1;
707 if let Some(iter) = get_from_sstable_info(
708 self.sstable_store.clone(),
709 sstable_info,
710 full_key.to_ref(),
711 &read_options,
712 dist_key_hash,
713 local_stats,
714 )
715 .await?
716 {
717 debug_assert!(iter.is_valid());
718 let data_epoch = iter.key().epoch_with_gap;
719 return Ok(if data_epoch.pure_epoch() < min_epoch {
720 None
721 } else {
722 iter.value()
723 .into_user_value()
724 .map(|v| {
725 on_key_value_fn(
726 FullKey::new_with_gap_epoch(
727 table_id,
728 table_key.to_ref(),
729 data_epoch,
730 ),
731 v,
732 )
733 })
734 .transpose()?
735 });
736 }
737 }
738 }
739 LevelType::Nonoverlapping => {
740 let mut table_info_idx =
741 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
742 if table_info_idx == 0 {
743 continue;
744 }
745 table_info_idx = table_info_idx.saturating_sub(1);
746 let ord = level.table_infos[table_info_idx]
747 .key_range
748 .compare_right_with_user_key(full_key.user_key.as_ref());
749 if ord == Ordering::Less {
751 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
752 continue;
753 }
754
755 local_stats.non_overlapping_get_count += 1;
756 if let Some(iter) = get_from_sstable_info(
757 self.sstable_store.clone(),
758 &level.table_infos[table_info_idx],
759 full_key.to_ref(),
760 &read_options,
761 dist_key_hash,
762 local_stats,
763 )
764 .await?
765 {
766 debug_assert!(iter.is_valid());
767 let data_epoch = iter.key().epoch_with_gap;
768 return Ok(if data_epoch.pure_epoch() < min_epoch {
769 None
770 } else {
771 iter.value()
772 .into_user_value()
773 .map(|v| {
774 on_key_value_fn(
775 FullKey::new_with_gap_epoch(
776 table_id,
777 table_key.to_ref(),
778 data_epoch,
779 ),
780 v,
781 )
782 })
783 .transpose()?
784 });
785 }
786 }
787 }
788 }
789 stats_guard.local_stats.found_key = false;
790 Ok(None)
791 }
792
793 pub async fn iter(
794 &self,
795 table_key_range: TableKeyRange,
796 epoch: u64,
797 table_id: TableId,
798 read_options: ReadOptions,
799 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
800 ) -> StorageResult<HummockStorageIterator> {
801 self.iter_with_memtable(
802 table_key_range,
803 epoch,
804 table_id,
805 read_options,
806 read_version_tuple,
807 None,
808 )
809 .await
810 }
811
812 pub async fn iter_with_memtable<'b>(
813 &self,
814 table_key_range: TableKeyRange,
815 epoch: u64,
816 table_id: TableId,
817 read_options: ReadOptions,
818 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
819 memtable_iter: Option<MemTableHummockIterator<'b>>,
820 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
821 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
822 let user_key_range = (
823 user_key_range_ref.0.map(|key| key.cloned()),
824 user_key_range_ref.1.map(|key| key.cloned()),
825 );
826 let mut factory = ForwardIteratorFactory::default();
827 let mut local_stats = StoreLocalStatistic::default();
828 let (imms, uncommitted_ssts, committed) = read_version_tuple;
829 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
830 self.iter_inner(
831 table_key_range,
832 epoch,
833 table_id,
834 read_options,
835 imms,
836 uncommitted_ssts,
837 &committed,
838 &mut local_stats,
839 &mut factory,
840 )
841 .await?;
842 let merge_iter = factory.build(memtable_iter);
843 let mut user_iter = UserIterator::new(
845 merge_iter,
846 user_key_range,
847 epoch,
848 min_epoch,
849 Some(committed),
850 );
851 user_iter.rewind().await?;
852 Ok(HummockStorageIteratorInner::new(
853 user_iter,
854 self.state_store_metrics.clone(),
855 table_id,
856 local_stats,
857 ))
858 }
859
860 pub async fn rev_iter<'b>(
861 &self,
862 table_key_range: TableKeyRange,
863 epoch: u64,
864 table_id: TableId,
865 read_options: ReadOptions,
866 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
867 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
868 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
869 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
870 let user_key_range = (
871 user_key_range_ref.0.map(|key| key.cloned()),
872 user_key_range_ref.1.map(|key| key.cloned()),
873 );
874 let mut factory = BackwardIteratorFactory::default();
875 let mut local_stats = StoreLocalStatistic::default();
876 let (imms, uncommitted_ssts, committed) = read_version_tuple;
877 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
878 self.iter_inner(
879 table_key_range,
880 epoch,
881 table_id,
882 read_options,
883 imms,
884 uncommitted_ssts,
885 &committed,
886 &mut local_stats,
887 &mut factory,
888 )
889 .await?;
890 let merge_iter = factory.build(memtable_iter);
891 let mut user_iter = BackwardUserIterator::new(
893 merge_iter,
894 user_key_range,
895 epoch,
896 min_epoch,
897 Some(committed),
898 );
899 user_iter.rewind().await?;
900 Ok(HummockStorageRevIteratorInner::new(
901 user_iter,
902 self.state_store_metrics.clone(),
903 table_id,
904 local_stats,
905 ))
906 }
907
908 async fn iter_inner<F: IteratorFactory>(
909 &self,
910 table_key_range: TableKeyRange,
911 epoch: u64,
912 table_id: TableId,
913 read_options: ReadOptions,
914 imms: Vec<ImmutableMemtable>,
915 uncommitted_ssts: Vec<SstableInfo>,
916 committed: &CommittedVersion,
917 local_stats: &mut StoreLocalStatistic,
918 factory: &mut F,
919 ) -> StorageResult<()> {
920 {
921 fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
922 match bound {
923 Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
924 Bound::Unbounded => None,
925 }
926 }
927 let (left, right) = &table_key_range;
928 if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
929 && right < left
930 {
931 if cfg!(debug_assertions) {
932 panic!("invalid iter key range: {table_id} {left:?} {right:?}")
933 } else {
934 return Err(HummockError::other(format!(
935 "invalid iter key range: {table_id} {left:?} {right:?}"
936 ))
937 .into());
938 }
939 }
940 }
941
942 local_stats.staging_imm_iter_count = imms.len() as u64;
943 for imm in imms {
944 factory.add_batch_iter(imm);
945 }
946
947 let user_key_range = bound_table_key_range(table_id, &table_key_range);
951 let user_key_range_ref = (
952 user_key_range.0.as_ref().map(UserKey::as_ref),
953 user_key_range.1.as_ref().map(UserKey::as_ref),
954 );
955 let mut staging_sst_iter_count = 0;
956 let bloom_filter_prefix_hash = read_options
958 .prefix_hint
959 .as_ref()
960 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
961 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
962 if read_options.prefetch_options.prefetch {
963 sst_read_options.must_iterated_end_user_key =
964 Some(user_key_range.1.map(|key| key.cloned()));
965 sst_read_options.max_preload_retry_times = self.preload_retry_times;
966 }
967 let sst_read_options = Arc::new(sst_read_options);
968 for sstable_info in &uncommitted_ssts {
969 let table_holder = self
970 .sstable_store
971 .sstable(sstable_info, local_stats)
972 .await?;
973
974 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
975 && !hit_sstable_bloom_filter(
976 &table_holder,
977 &user_key_range_ref,
978 *prefix_hash,
979 local_stats,
980 )
981 {
982 continue;
983 }
984
985 staging_sst_iter_count += 1;
986 factory.add_staging_sst_iter(F::SstableIteratorType::create(
987 table_holder,
988 self.sstable_store.clone(),
989 sst_read_options.clone(),
990 sstable_info,
991 ));
992 }
993 local_stats.staging_sst_iter_count = staging_sst_iter_count;
994
995 let timer = Instant::now();
996
997 for level in committed.levels(table_id) {
998 if level.table_infos.is_empty() {
999 continue;
1000 }
1001
1002 if level.level_type == LevelType::Nonoverlapping {
1003 let mut table_infos =
1004 prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1005 .peekable();
1006
1007 if table_infos.peek().is_none() {
1008 continue;
1009 }
1010 let sstable_infos = table_infos.cloned().collect_vec();
1011 if sstable_infos.len() > 1 {
1012 factory.add_concat_sst_iter(
1013 sstable_infos,
1014 self.sstable_store.clone(),
1015 sst_read_options.clone(),
1016 );
1017 local_stats.non_overlapping_iter_count += 1;
1018 } else {
1019 let sstable = self
1020 .sstable_store
1021 .sstable(&sstable_infos[0], local_stats)
1022 .await?;
1023
1024 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1025 && !hit_sstable_bloom_filter(
1026 &sstable,
1027 &user_key_range_ref,
1028 *dist_hash,
1029 local_stats,
1030 )
1031 {
1032 continue;
1033 }
1034 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1040 sstable,
1041 self.sstable_store.clone(),
1042 sst_read_options.clone(),
1043 &sstable_infos[0],
1044 ));
1045 local_stats.non_overlapping_iter_count += 1;
1046 }
1047 } else {
1048 let table_infos =
1049 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1050 let fetch_meta_req = table_infos.rev().collect_vec();
1052 if fetch_meta_req.is_empty() {
1053 continue;
1054 }
1055 for sstable_info in fetch_meta_req {
1056 let sstable = self
1057 .sstable_store
1058 .sstable(sstable_info, local_stats)
1059 .await?;
1060 assert_eq!(sstable_info.object_id, sstable.id);
1061 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1062 && !hit_sstable_bloom_filter(
1063 &sstable,
1064 &user_key_range_ref,
1065 *dist_hash,
1066 local_stats,
1067 )
1068 {
1069 continue;
1070 }
1071 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1072 sstable,
1073 self.sstable_store.clone(),
1074 sst_read_options.clone(),
1075 sstable_info,
1076 ));
1077 local_stats.overlapping_iter_count += 1;
1078 }
1079 }
1080 }
1081 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1082 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1083 let table_id_string = table_id.to_string();
1084 tracing::warn!(
1085 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1086 table_id_string,
1087 epoch,
1088 fetch_meta_duration_sec,
1089 local_stats.cache_meta_block_miss
1090 );
1091 self.state_store_metrics
1092 .iter_slow_fetch_meta_cache_unhits
1093 .set(local_stats.cache_meta_block_miss as i64);
1094 }
1095 Ok(())
1096 }
1097
1098 pub async fn iter_log(
1099 &self,
1100 version: PinnedVersion,
1101 epoch_range: (u64, u64),
1102 key_range: TableKeyRange,
1103 options: ReadLogOptions,
1104 ) -> HummockResult<ChangeLogIterator> {
1105 let change_log = {
1106 let table_change_logs = version.table_change_log_read_lock();
1107 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1108 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1109 } else {
1110 Vec::new()
1111 }
1112 };
1113
1114 if let Some(max_epoch_change_log) = change_log.last() {
1115 let (_, max_epoch) = epoch_range;
1116 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1117 warn!(
1118 max_epoch,
1119 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1120 table_id = %options.table_id,
1121 "max_epoch does not exist"
1122 );
1123 }
1124 }
1125 let read_options = Arc::new(SstableIteratorReadOptions {
1126 cache_policy: Default::default(),
1127 must_iterated_end_user_key: None,
1128 max_preload_retry_times: 0,
1129 prefetch_for_large_query: false,
1130 });
1131
1132 async fn make_iter(
1133 sstable_infos: impl Iterator<Item = &SstableInfo>,
1134 sstable_store: &SstableStoreRef,
1135 read_options: Arc<SstableIteratorReadOptions>,
1136 local_stat: &mut StoreLocalStatistic,
1137 ) -> HummockResult<MergeIterator<SstableIterator>> {
1138 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1139 let sstable_store = sstable_store.clone();
1140 let read_options = read_options.clone();
1141 async move {
1142 let mut local_stat = StoreLocalStatistic::default();
1143 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1144 Ok::<_, HummockError>((
1145 SstableIterator::new(
1146 table_holder,
1147 sstable_store,
1148 read_options,
1149 sstable_info,
1150 ),
1151 local_stat,
1152 ))
1153 }
1154 }))
1155 .await?;
1156 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1157 |(iter, stats)| {
1158 local_stat.add(&stats);
1159 iter
1160 },
1161 )))
1162 }
1163
1164 let mut local_stat = StoreLocalStatistic::default();
1165
1166 let new_value_iter = make_iter(
1167 change_log
1168 .iter()
1169 .flat_map(|log| log.new_value.iter())
1170 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1171 &self.sstable_store,
1172 read_options.clone(),
1173 &mut local_stat,
1174 )
1175 .await?;
1176 let old_value_iter = make_iter(
1177 change_log
1178 .iter()
1179 .flat_map(|log| log.old_value.iter())
1180 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1181 &self.sstable_store,
1182 read_options.clone(),
1183 &mut local_stat,
1184 )
1185 .await?;
1186 ChangeLogIterator::new(
1187 epoch_range,
1188 key_range,
1189 new_value_iter,
1190 old_value_iter,
1191 options.table_id,
1192 IterLocalMetricsGuard::new(
1193 self.state_store_metrics.clone(),
1194 options.table_id,
1195 local_stat,
1196 ),
1197 )
1198 .await
1199 }
1200
1201 pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1202 &'a self,
1203 version: PinnedVersion,
1204 table_id: TableId,
1205 target: VectorRef<'a>,
1206 options: VectorNearestOptions,
1207 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1208 ) -> HummockResult<Vec<O>> {
1209 let Some(index) = version.vector_indexes.get(&table_id) else {
1210 return Ok(vec![]);
1211 };
1212 if target.dimension() != index.dimension {
1213 return Err(HummockError::other(format!(
1214 "target dimension {} not match index dimension {}",
1215 target.dimension(),
1216 index.dimension
1217 )));
1218 }
1219 match &index.inner {
1220 VectorIndexImpl::Flat(flat) => {
1221 let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1222 for vector_file in &flat.vector_store_info.vector_files {
1223 let meta = self.sstable_store.get_vector_file_meta(vector_file).await?;
1224 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1225 let block = self
1226 .sstable_store
1227 .get_vector_block(vector_file, i, block_meta)
1228 .await?;
1229 builder.add(&**block, &on_nearest_item_fn);
1230 }
1231 }
1232 Ok(builder.finish())
1233 }
1234 VectorIndexImpl::HnswFlat(hnsw_flat) => {
1235 let Some(graph_file) = &hnsw_flat.graph_file else {
1236 return Ok(vec![]);
1237 };
1238
1239 let graph = self.sstable_store.get_hnsw_graph(graph_file).await?;
1240
1241 let vector_store =
1242 FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1243 let (items, _stats) = nearest::<O, M>(
1244 &vector_store,
1245 &*graph,
1246 target,
1247 on_nearest_item_fn,
1248 options.hnsw_ef_search,
1249 options.top_n,
1250 )
1251 .await?;
1252 Ok(items)
1253 }
1254 }
1255 }
1256}