1use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54 filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55 search_sst_idx,
56};
57use crate::hummock::vector::file::FileVectorStore;
58use crate::hummock::{
59 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
60 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
61 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
62 hit_sstable_bloom_filter,
63};
64use crate::mem_table::{
65 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
66};
67use crate::monitor::{
68 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
69};
70use crate::store::{
71 OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
72};
73use crate::vector::hnsw::nearest;
74use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
75
76pub type CommittedVersion = PinnedVersion;
77
78#[derive(Clone, Debug, PartialEq)]
84pub struct StagingSstableInfo {
85 sstable_infos: Vec<LocalSstableInfo>,
87 old_value_sstable_infos: Vec<LocalSstableInfo>,
88 epochs: Vec<HummockEpoch>,
91 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
93 imm_size: usize,
94}
95
96impl StagingSstableInfo {
97 pub fn new(
98 sstable_infos: Vec<LocalSstableInfo>,
99 old_value_sstable_infos: Vec<LocalSstableInfo>,
100 epochs: Vec<HummockEpoch>,
101 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
102 imm_size: usize,
103 ) -> Self {
104 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
106 Self {
107 sstable_infos,
108 old_value_sstable_infos,
109 epochs,
110 imm_ids,
111 imm_size,
112 }
113 }
114
115 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
116 &self.sstable_infos
117 }
118
119 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
120 &self.old_value_sstable_infos
121 }
122
123 pub fn imm_size(&self) -> usize {
124 self.imm_size
125 }
126
127 pub fn epochs(&self) -> &Vec<HummockEpoch> {
128 &self.epochs
129 }
130
131 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
132 &self.imm_ids
133 }
134}
135
136pub enum VersionUpdate {
137 Sst(Arc<StagingSstableInfo>),
138 CommittedSnapshot(CommittedVersion),
139 NewTableWatermark {
140 direction: WatermarkDirection,
141 epoch: HummockEpoch,
142 vnode_watermarks: Vec<VnodeWatermark>,
143 watermark_type: WatermarkSerdeType,
144 },
145}
146
147#[derive(Clone)]
148pub struct StagingVersion {
149 pending_imm_size: usize,
150 pub pending_imms: Vec<ImmutableMemtable>,
157 pub uploading_imms: VecDeque<ImmutableMemtable>,
163
164 pub sst: VecDeque<Arc<StagingSstableInfo>>,
166}
167
168impl StagingVersion {
169 pub fn prune_overlap<'a>(
172 &'a self,
173 max_epoch_inclusive: HummockEpoch,
174 table_id: TableId,
175 table_key_range: &'a TableKeyRange,
176 ) -> (
177 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
178 impl Iterator<Item = &'a SstableInfo> + 'a,
179 ) {
180 let (left, right) = table_key_range;
181 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
182 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
183 let overlapped_imms = self
184 .pending_imms
185 .iter()
186 .rev() .chain(self.uploading_imms.iter())
188 .filter(move |imm| {
189 imm.min_epoch() <= max_epoch_inclusive
191 && imm.table_id == table_id
192 && range_overlap(
193 &(left, right),
194 &imm.start_table_key(),
195 Bound::Included(&imm.end_table_key()),
196 )
197 });
198
199 let overlapped_ssts = self
201 .sst
202 .iter()
203 .filter(move |staging_sst| {
204 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
205 sst_max_epoch <= max_epoch_inclusive
206 })
207 .flat_map(move |staging_sst| {
208 staging_sst
211 .sstable_infos
212 .iter()
213 .map(|sstable| &sstable.sst_info)
214 .filter(move |sstable: &&SstableInfo| {
215 filter_single_sst(sstable, table_id, table_key_range)
216 })
217 });
218 (overlapped_imms, overlapped_ssts)
219 }
220
221 pub fn is_empty(&self) -> bool {
222 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
223 }
224}
225
226#[derive(Clone)]
227pub struct HummockReadVersion {
229 table_id: TableId,
230 instance_id: LocalInstanceId,
231
232 staging: StagingVersion,
234
235 committed: CommittedVersion,
237
238 is_replicated: bool,
243
244 table_watermarks: Option<PkPrefixTableWatermarksIndex>,
245
246 vnodes: Arc<Bitmap>,
249}
250
251impl HummockReadVersion {
252 pub fn new_with_replication_option(
253 table_id: TableId,
254 instance_id: LocalInstanceId,
255 committed_version: CommittedVersion,
256 is_replicated: bool,
257 vnodes: Arc<Bitmap>,
258 ) -> Self {
259 assert!(committed_version.is_valid());
263 Self {
264 table_id,
265 instance_id,
266 table_watermarks: {
267 match committed_version.table_watermarks.get(&table_id) {
268 Some(table_watermarks) => match table_watermarks.watermark_type {
269 WatermarkSerdeType::PkPrefix => {
270 Some(PkPrefixTableWatermarksIndex::new_committed(
271 table_watermarks.clone(),
272 committed_version
273 .state_table_info
274 .info()
275 .get(&table_id)
276 .expect("should exist")
277 .committed_epoch,
278 ))
279 }
280
281 WatermarkSerdeType::NonPkPrefix => None, },
283 None => None,
284 }
285 },
286 staging: StagingVersion {
287 pending_imm_size: 0,
288 pending_imms: Vec::default(),
289 uploading_imms: VecDeque::default(),
290 sst: VecDeque::default(),
291 },
292
293 committed: committed_version,
294
295 is_replicated,
296 vnodes,
297 }
298 }
299
300 pub fn new(
301 table_id: TableId,
302 instance_id: LocalInstanceId,
303 committed_version: CommittedVersion,
304 vnodes: Arc<Bitmap>,
305 ) -> Self {
306 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
307 }
308
309 pub fn table_id(&self) -> TableId {
310 self.table_id
311 }
312
313 pub fn add_imm(&mut self, imm: ImmutableMemtable) {
314 if let Some(item) = self
315 .staging
316 .pending_imms
317 .last()
318 .or_else(|| self.staging.uploading_imms.front())
319 {
320 debug_assert!(item.batch_id() < imm.batch_id());
322 }
323
324 self.staging.pending_imm_size += imm.size();
325 self.staging.pending_imms.push(imm);
326 }
327
328 pub fn pending_imm_size(&self) -> usize {
329 self.staging.pending_imm_size
330 }
331
332 pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
333 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
334 for imm in &pending_imms {
335 self.staging.uploading_imms.push_front(imm.clone());
336 }
337 self.staging.pending_imm_size = 0;
338 pending_imms
339 }
340
341 pub fn update(&mut self, info: VersionUpdate) {
351 match info {
352 VersionUpdate::Sst(staging_sst_ref) => {
353 {
354 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
355 warn!(
356 instance_id = self.instance_id,
357 "no related imm in sst input"
358 );
359 return;
360 };
361
362 for imm_id in imms.iter().rev() {
364 let check_err = match self.staging.uploading_imms.pop_back() {
365 None => Some("empty".to_owned()),
366 Some(prev_imm_id) => {
367 if prev_imm_id.batch_id() == *imm_id {
368 None
369 } else {
370 Some(format!(
371 "miss match id {} {}",
372 prev_imm_id.batch_id(),
373 *imm_id
374 ))
375 }
376 }
377 };
378 assert!(
379 check_err.is_none(),
380 "should be valid staging_sst.size {},
381 staging_sst.imm_ids {:?},
382 staging_sst.epochs {:?},
383 local_pending_imm_ids {:?},
384 local_uploading_imm_ids {:?},
385 instance_id {}
386 check_err {:?}",
387 staging_sst_ref.imm_size,
388 staging_sst_ref.imm_ids,
389 staging_sst_ref.epochs,
390 self.staging
391 .pending_imms
392 .iter()
393 .map(|imm| imm.batch_id())
394 .collect_vec(),
395 self.staging
396 .uploading_imms
397 .iter()
398 .map(|imm| imm.batch_id())
399 .collect_vec(),
400 self.instance_id,
401 check_err
402 );
403 }
404
405 self.staging.sst.push_front(staging_sst_ref);
406 }
407 }
408
409 VersionUpdate::CommittedSnapshot(committed_version) => {
410 if let Some(info) = committed_version
411 .state_table_info
412 .info()
413 .get(&self.table_id)
414 {
415 let committed_epoch = info.committed_epoch;
416 if self.is_replicated {
417 self.staging
418 .uploading_imms
419 .retain(|imm| imm.min_epoch() > committed_epoch);
420 self.staging
421 .pending_imms
422 .retain(|imm| imm.min_epoch() > committed_epoch);
423 } else {
424 self.staging
425 .pending_imms
426 .iter()
427 .chain(self.staging.uploading_imms.iter())
428 .for_each(|imm| {
429 assert!(
430 imm.min_epoch() > committed_epoch,
431 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
432 imm.table_id,
433 imm.min_epoch(),
434 committed_epoch
435 )
436 });
437 }
438
439 self.staging.sst.retain(|sst| {
440 sst.epochs.first().expect("epochs not empty") > &committed_epoch
441 });
442
443 assert!(self.staging.sst.iter().all(|sst| {
445 sst.epochs.last().expect("epochs not empty") > &committed_epoch
446 }));
447
448 if let Some(committed_watermarks) =
449 committed_version.table_watermarks.get(&self.table_id)
450 && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
451 {
452 if let Some(watermark_index) = &mut self.table_watermarks {
453 watermark_index.apply_committed_watermarks(
454 committed_watermarks.clone(),
455 committed_epoch,
456 );
457 } else {
458 self.table_watermarks =
459 Some(PkPrefixTableWatermarksIndex::new_committed(
460 committed_watermarks.clone(),
461 committed_epoch,
462 ));
463 }
464 }
465 }
466
467 self.committed = committed_version;
468 }
469 VersionUpdate::NewTableWatermark {
470 direction,
471 epoch,
472 vnode_watermarks,
473 watermark_type,
474 } => {
475 assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
476 if let Some(watermark_index) = &mut self.table_watermarks {
477 watermark_index.add_epoch_watermark(
478 epoch,
479 Arc::from(vnode_watermarks),
480 direction,
481 );
482 } else {
483 self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
484 direction,
485 epoch,
486 vnode_watermarks,
487 self.committed.table_committed_epoch(self.table_id),
488 ));
489 }
490 }
491 }
492 }
493
494 pub fn staging(&self) -> &StagingVersion {
495 &self.staging
496 }
497
498 pub fn committed(&self) -> &CommittedVersion {
499 &self.committed
500 }
501
502 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
507 if let Some(watermark_index) = &self.table_watermarks {
508 watermark_index.filter_regress_watermarks(watermarks)
509 }
510 }
511
512 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
513 self.table_watermarks
514 .as_ref()
515 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
516 }
517
518 pub fn is_replicated(&self) -> bool {
519 self.is_replicated
520 }
521
522 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
523 std::mem::replace(&mut self.vnodes, vnodes)
524 }
525
526 pub fn contains(&self, vnode: VirtualNode) -> bool {
527 self.vnodes.is_set(vnode.to_index())
528 }
529
530 pub fn vnodes(&self) -> Arc<Bitmap> {
531 self.vnodes.clone()
532 }
533}
534
535pub fn read_filter_for_version(
536 epoch: HummockEpoch,
537 table_id: TableId,
538 mut table_key_range: TableKeyRange,
539 read_version: &RwLock<HummockReadVersion>,
540) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
541 let read_version_guard = read_version.read();
542
543 let committed_version = read_version_guard.committed().clone();
544
545 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
546 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
547 }
548
549 let (imm_iter, sst_iter) =
550 read_version_guard
551 .staging()
552 .prune_overlap(epoch, table_id, &table_key_range);
553
554 let imms = imm_iter.cloned().collect();
555 let ssts = sst_iter.cloned().collect();
556
557 Ok((table_key_range, (imms, ssts, committed_version)))
558}
559
560#[derive(Clone)]
561pub struct HummockVersionReader {
562 sstable_store: SstableStoreRef,
563
564 state_store_metrics: Arc<HummockStateStoreMetrics>,
566 preload_retry_times: usize,
567}
568
569impl HummockVersionReader {
572 pub fn new(
573 sstable_store: SstableStoreRef,
574 state_store_metrics: Arc<HummockStateStoreMetrics>,
575 preload_retry_times: usize,
576 ) -> Self {
577 Self {
578 sstable_store,
579 state_store_metrics,
580 preload_retry_times,
581 }
582 }
583
584 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
585 &self.state_store_metrics
586 }
587}
588
589const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
590
591impl HummockVersionReader {
592 pub async fn get<'a, O>(
593 &'a self,
594 table_key: TableKey<Bytes>,
595 epoch: u64,
596 table_id: TableId,
597 read_options: ReadOptions,
598 read_version_tuple: ReadVersionTuple,
599 on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
600 ) -> StorageResult<Option<O>> {
601 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
602
603 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
604 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
605 let local_stats = &mut stats_guard.local_stats;
606 local_stats.found_key = true;
607
608 for imm in &imms {
610 if imm.max_epoch() < min_epoch {
612 continue;
613 }
614
615 local_stats.staging_imm_get_count += 1;
616
617 if let Some((data, data_epoch)) = get_from_batch(
618 imm,
619 TableKey(table_key.as_ref()),
620 epoch,
621 &read_options,
622 local_stats,
623 ) {
624 return Ok(if data_epoch.pure_epoch() < min_epoch {
625 None
626 } else {
627 data.into_user_value()
628 .map(|v| {
629 on_key_value_fn(
630 FullKey::new_with_gap_epoch(
631 table_id,
632 table_key.to_ref(),
633 data_epoch,
634 ),
635 v.as_ref(),
636 )
637 })
638 .transpose()?
639 });
640 }
641 }
642
643 let dist_key_hash = read_options
645 .prefix_hint
646 .as_ref()
647 .map(|dist_key| Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.table_id()));
648
649 let full_key = FullKey::new_with_gap_epoch(
652 table_id,
653 TableKey(table_key.clone()),
654 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
655 );
656 for local_sst in &uncommitted_ssts {
657 local_stats.staging_sst_get_count += 1;
658 if let Some(iter) = get_from_sstable_info(
659 self.sstable_store.clone(),
660 local_sst,
661 full_key.to_ref(),
662 &read_options,
663 dist_key_hash,
664 local_stats,
665 )
666 .await?
667 {
668 debug_assert!(iter.is_valid());
669 let data_epoch = iter.key().epoch_with_gap;
670 return Ok(if data_epoch.pure_epoch() < min_epoch {
671 None
672 } else {
673 iter.value()
674 .into_user_value()
675 .map(|v| {
676 on_key_value_fn(
677 FullKey::new_with_gap_epoch(
678 table_id,
679 table_key.to_ref(),
680 data_epoch,
681 ),
682 v,
683 )
684 })
685 .transpose()?
686 });
687 }
688 }
689 let single_table_key_range = table_key.clone()..=table_key.clone();
690 assert!(committed_version.is_valid());
694 for level in committed_version.levels(table_id) {
695 if level.table_infos.is_empty() {
696 continue;
697 }
698
699 match level.level_type {
700 LevelType::Overlapping | LevelType::Unspecified => {
701 let sstable_infos = prune_overlapping_ssts(
702 &level.table_infos,
703 table_id,
704 &single_table_key_range,
705 );
706 for sstable_info in sstable_infos {
707 local_stats.overlapping_get_count += 1;
708 if let Some(iter) = get_from_sstable_info(
709 self.sstable_store.clone(),
710 sstable_info,
711 full_key.to_ref(),
712 &read_options,
713 dist_key_hash,
714 local_stats,
715 )
716 .await?
717 {
718 debug_assert!(iter.is_valid());
719 let data_epoch = iter.key().epoch_with_gap;
720 return Ok(if data_epoch.pure_epoch() < min_epoch {
721 None
722 } else {
723 iter.value()
724 .into_user_value()
725 .map(|v| {
726 on_key_value_fn(
727 FullKey::new_with_gap_epoch(
728 table_id,
729 table_key.to_ref(),
730 data_epoch,
731 ),
732 v,
733 )
734 })
735 .transpose()?
736 });
737 }
738 }
739 }
740 LevelType::Nonoverlapping => {
741 let mut table_info_idx =
742 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
743 if table_info_idx == 0 {
744 continue;
745 }
746 table_info_idx = table_info_idx.saturating_sub(1);
747 let ord = level.table_infos[table_info_idx]
748 .key_range
749 .compare_right_with_user_key(full_key.user_key.as_ref());
750 if ord == Ordering::Less {
752 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
753 continue;
754 }
755
756 local_stats.non_overlapping_get_count += 1;
757 if let Some(iter) = get_from_sstable_info(
758 self.sstable_store.clone(),
759 &level.table_infos[table_info_idx],
760 full_key.to_ref(),
761 &read_options,
762 dist_key_hash,
763 local_stats,
764 )
765 .await?
766 {
767 debug_assert!(iter.is_valid());
768 let data_epoch = iter.key().epoch_with_gap;
769 return Ok(if data_epoch.pure_epoch() < min_epoch {
770 None
771 } else {
772 iter.value()
773 .into_user_value()
774 .map(|v| {
775 on_key_value_fn(
776 FullKey::new_with_gap_epoch(
777 table_id,
778 table_key.to_ref(),
779 data_epoch,
780 ),
781 v,
782 )
783 })
784 .transpose()?
785 });
786 }
787 }
788 }
789 }
790 stats_guard.local_stats.found_key = false;
791 Ok(None)
792 }
793
794 pub async fn iter(
795 &self,
796 table_key_range: TableKeyRange,
797 epoch: u64,
798 table_id: TableId,
799 read_options: ReadOptions,
800 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
801 ) -> StorageResult<HummockStorageIterator> {
802 self.iter_with_memtable(
803 table_key_range,
804 epoch,
805 table_id,
806 read_options,
807 read_version_tuple,
808 None,
809 )
810 .await
811 }
812
813 pub async fn iter_with_memtable<'b>(
814 &self,
815 table_key_range: TableKeyRange,
816 epoch: u64,
817 table_id: TableId,
818 read_options: ReadOptions,
819 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
820 memtable_iter: Option<MemTableHummockIterator<'b>>,
821 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
822 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
823 let user_key_range = (
824 user_key_range_ref.0.map(|key| key.cloned()),
825 user_key_range_ref.1.map(|key| key.cloned()),
826 );
827 let mut factory = ForwardIteratorFactory::default();
828 let mut local_stats = StoreLocalStatistic::default();
829 let (imms, uncommitted_ssts, committed) = read_version_tuple;
830 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
831 self.iter_inner(
832 table_key_range,
833 epoch,
834 table_id,
835 read_options,
836 imms,
837 uncommitted_ssts,
838 &committed,
839 &mut local_stats,
840 &mut factory,
841 )
842 .await?;
843 let merge_iter = factory.build(memtable_iter);
844 let mut user_iter = UserIterator::new(
846 merge_iter,
847 user_key_range,
848 epoch,
849 min_epoch,
850 Some(committed),
851 );
852 user_iter.rewind().await?;
853 Ok(HummockStorageIteratorInner::new(
854 user_iter,
855 self.state_store_metrics.clone(),
856 table_id,
857 local_stats,
858 ))
859 }
860
861 pub async fn rev_iter<'b>(
862 &self,
863 table_key_range: TableKeyRange,
864 epoch: u64,
865 table_id: TableId,
866 read_options: ReadOptions,
867 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
868 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
869 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
870 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
871 let user_key_range = (
872 user_key_range_ref.0.map(|key| key.cloned()),
873 user_key_range_ref.1.map(|key| key.cloned()),
874 );
875 let mut factory = BackwardIteratorFactory::default();
876 let mut local_stats = StoreLocalStatistic::default();
877 let (imms, uncommitted_ssts, committed) = read_version_tuple;
878 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
879 self.iter_inner(
880 table_key_range,
881 epoch,
882 table_id,
883 read_options,
884 imms,
885 uncommitted_ssts,
886 &committed,
887 &mut local_stats,
888 &mut factory,
889 )
890 .await?;
891 let merge_iter = factory.build(memtable_iter);
892 let mut user_iter = BackwardUserIterator::new(
894 merge_iter,
895 user_key_range,
896 epoch,
897 min_epoch,
898 Some(committed),
899 );
900 user_iter.rewind().await?;
901 Ok(HummockStorageRevIteratorInner::new(
902 user_iter,
903 self.state_store_metrics.clone(),
904 table_id,
905 local_stats,
906 ))
907 }
908
909 async fn iter_inner<F: IteratorFactory>(
910 &self,
911 table_key_range: TableKeyRange,
912 epoch: u64,
913 table_id: TableId,
914 read_options: ReadOptions,
915 imms: Vec<ImmutableMemtable>,
916 uncommitted_ssts: Vec<SstableInfo>,
917 committed: &CommittedVersion,
918 local_stats: &mut StoreLocalStatistic,
919 factory: &mut F,
920 ) -> StorageResult<()> {
921 {
922 fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
923 match bound {
924 Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
925 Bound::Unbounded => None,
926 }
927 }
928 let (left, right) = &table_key_range;
929 if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
930 && right < left
931 {
932 if cfg!(debug_assertions) {
933 panic!("invalid iter key range: {table_id} {left:?} {right:?}")
934 } else {
935 return Err(HummockError::other(format!(
936 "invalid iter key range: {table_id} {left:?} {right:?}"
937 ))
938 .into());
939 }
940 }
941 }
942
943 local_stats.staging_imm_iter_count = imms.len() as u64;
944 for imm in imms {
945 factory.add_batch_iter(imm);
946 }
947
948 let user_key_range = bound_table_key_range(table_id, &table_key_range);
952 let user_key_range_ref = (
953 user_key_range.0.as_ref().map(UserKey::as_ref),
954 user_key_range.1.as_ref().map(UserKey::as_ref),
955 );
956 let mut staging_sst_iter_count = 0;
957 let bloom_filter_prefix_hash = read_options
959 .prefix_hint
960 .as_ref()
961 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.table_id()));
962 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
963 if read_options.prefetch_options.prefetch {
964 sst_read_options.must_iterated_end_user_key =
965 Some(user_key_range.1.map(|key| key.cloned()));
966 sst_read_options.max_preload_retry_times = self.preload_retry_times;
967 }
968 let sst_read_options = Arc::new(sst_read_options);
969 for sstable_info in &uncommitted_ssts {
970 let table_holder = self
971 .sstable_store
972 .sstable(sstable_info, local_stats)
973 .await?;
974
975 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
976 && !hit_sstable_bloom_filter(
977 &table_holder,
978 &user_key_range_ref,
979 *prefix_hash,
980 local_stats,
981 )
982 {
983 continue;
984 }
985
986 staging_sst_iter_count += 1;
987 factory.add_staging_sst_iter(F::SstableIteratorType::create(
988 table_holder,
989 self.sstable_store.clone(),
990 sst_read_options.clone(),
991 sstable_info,
992 ));
993 }
994 local_stats.staging_sst_iter_count = staging_sst_iter_count;
995
996 let timer = Instant::now();
997
998 for level in committed.levels(table_id) {
999 if level.table_infos.is_empty() {
1000 continue;
1001 }
1002
1003 if level.level_type == LevelType::Nonoverlapping {
1004 let mut table_infos = prune_nonoverlapping_ssts(
1005 &level.table_infos,
1006 user_key_range_ref,
1007 table_id.table_id(),
1008 )
1009 .peekable();
1010
1011 if table_infos.peek().is_none() {
1012 continue;
1013 }
1014 let sstable_infos = table_infos.cloned().collect_vec();
1015 if sstable_infos.len() > 1 {
1016 factory.add_concat_sst_iter(
1017 sstable_infos,
1018 self.sstable_store.clone(),
1019 sst_read_options.clone(),
1020 );
1021 local_stats.non_overlapping_iter_count += 1;
1022 } else {
1023 let sstable = self
1024 .sstable_store
1025 .sstable(&sstable_infos[0], local_stats)
1026 .await?;
1027
1028 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1029 && !hit_sstable_bloom_filter(
1030 &sstable,
1031 &user_key_range_ref,
1032 *dist_hash,
1033 local_stats,
1034 )
1035 {
1036 continue;
1037 }
1038 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1044 sstable,
1045 self.sstable_store.clone(),
1046 sst_read_options.clone(),
1047 &sstable_infos[0],
1048 ));
1049 local_stats.non_overlapping_iter_count += 1;
1050 }
1051 } else {
1052 let table_infos =
1053 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1054 let fetch_meta_req = table_infos.rev().collect_vec();
1056 if fetch_meta_req.is_empty() {
1057 continue;
1058 }
1059 for sstable_info in fetch_meta_req {
1060 let sstable = self
1061 .sstable_store
1062 .sstable(sstable_info, local_stats)
1063 .await?;
1064 assert_eq!(sstable_info.object_id, sstable.id);
1065 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1066 && !hit_sstable_bloom_filter(
1067 &sstable,
1068 &user_key_range_ref,
1069 *dist_hash,
1070 local_stats,
1071 )
1072 {
1073 continue;
1074 }
1075 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1076 sstable,
1077 self.sstable_store.clone(),
1078 sst_read_options.clone(),
1079 sstable_info,
1080 ));
1081 local_stats.overlapping_iter_count += 1;
1082 }
1083 }
1084 }
1085 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1086 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1087 let table_id_string = table_id.to_string();
1088 tracing::warn!(
1089 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1090 table_id_string,
1091 epoch,
1092 fetch_meta_duration_sec,
1093 local_stats.cache_meta_block_miss
1094 );
1095 self.state_store_metrics
1096 .iter_slow_fetch_meta_cache_unhits
1097 .set(local_stats.cache_meta_block_miss as i64);
1098 }
1099 Ok(())
1100 }
1101
1102 pub async fn iter_log(
1103 &self,
1104 version: PinnedVersion,
1105 epoch_range: (u64, u64),
1106 key_range: TableKeyRange,
1107 options: ReadLogOptions,
1108 ) -> HummockResult<ChangeLogIterator> {
1109 let change_log = {
1110 let table_change_logs = version.table_change_log_read_lock();
1111 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1112 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1113 } else {
1114 Vec::new()
1115 }
1116 };
1117
1118 if let Some(max_epoch_change_log) = change_log.last() {
1119 let (_, max_epoch) = epoch_range;
1120 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1121 warn!(
1122 max_epoch,
1123 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1124 table_id = options.table_id.table_id,
1125 "max_epoch does not exist"
1126 );
1127 }
1128 }
1129 let read_options = Arc::new(SstableIteratorReadOptions {
1130 cache_policy: Default::default(),
1131 must_iterated_end_user_key: None,
1132 max_preload_retry_times: 0,
1133 prefetch_for_large_query: false,
1134 });
1135
1136 async fn make_iter(
1137 sstable_infos: impl Iterator<Item = &SstableInfo>,
1138 sstable_store: &SstableStoreRef,
1139 read_options: Arc<SstableIteratorReadOptions>,
1140 local_stat: &mut StoreLocalStatistic,
1141 ) -> HummockResult<MergeIterator<SstableIterator>> {
1142 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1143 let sstable_store = sstable_store.clone();
1144 let read_options = read_options.clone();
1145 async move {
1146 let mut local_stat = StoreLocalStatistic::default();
1147 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1148 Ok::<_, HummockError>((
1149 SstableIterator::new(
1150 table_holder,
1151 sstable_store,
1152 read_options,
1153 sstable_info,
1154 ),
1155 local_stat,
1156 ))
1157 }
1158 }))
1159 .await?;
1160 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1161 |(iter, stats)| {
1162 local_stat.add(&stats);
1163 iter
1164 },
1165 )))
1166 }
1167
1168 let mut local_stat = StoreLocalStatistic::default();
1169
1170 let new_value_iter = make_iter(
1171 change_log
1172 .iter()
1173 .flat_map(|log| log.new_value.iter())
1174 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1175 &self.sstable_store,
1176 read_options.clone(),
1177 &mut local_stat,
1178 )
1179 .await?;
1180 let old_value_iter = make_iter(
1181 change_log
1182 .iter()
1183 .flat_map(|log| log.old_value.iter())
1184 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1185 &self.sstable_store,
1186 read_options.clone(),
1187 &mut local_stat,
1188 )
1189 .await?;
1190 ChangeLogIterator::new(
1191 epoch_range,
1192 key_range,
1193 new_value_iter,
1194 old_value_iter,
1195 options.table_id,
1196 IterLocalMetricsGuard::new(
1197 self.state_store_metrics.clone(),
1198 options.table_id,
1199 local_stat,
1200 ),
1201 )
1202 .await
1203 }
1204
1205 pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1206 &'a self,
1207 version: PinnedVersion,
1208 table_id: TableId,
1209 target: VectorRef<'a>,
1210 options: VectorNearestOptions,
1211 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1212 ) -> HummockResult<Vec<O>> {
1213 let Some(index) = version.vector_indexes.get(&table_id) else {
1214 return Ok(vec![]);
1215 };
1216 if target.dimension() != index.dimension {
1217 return Err(HummockError::other(format!(
1218 "target dimension {} not match index dimension {}",
1219 target.dimension(),
1220 index.dimension
1221 )));
1222 }
1223 match &index.inner {
1224 VectorIndexImpl::Flat(flat) => {
1225 let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1226 for vector_file in &flat.vector_store_info.vector_files {
1227 let meta = self.sstable_store.get_vector_file_meta(vector_file).await?;
1228 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1229 let block = self
1230 .sstable_store
1231 .get_vector_block(vector_file, i, block_meta)
1232 .await?;
1233 builder.add(&**block, &on_nearest_item_fn);
1234 }
1235 }
1236 Ok(builder.finish())
1237 }
1238 VectorIndexImpl::HnswFlat(hnsw_flat) => {
1239 let Some(graph_file) = &hnsw_flat.graph_file else {
1240 return Ok(vec![]);
1241 };
1242
1243 let graph = self.sstable_store.get_hnsw_graph(graph_file).await?;
1244
1245 let vector_store =
1246 FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1247 let (items, _stats) = nearest::<O, M>(
1248 &vector_store,
1249 &*graph,
1250 target,
1251 on_nearest_item_fn,
1252 options.hnsw_ef_search,
1253 options.top_n,
1254 )
1255 .await?;
1256 Ok(items)
1257 }
1258 }
1259 }
1260}