1use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54 MemoryTracker, filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts,
55 range_overlap, search_sst_idx,
56};
57use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
58use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
59use crate::hummock::{
60 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
61 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
62 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
63 hit_sstable_bloom_filter,
64};
65use crate::mem_table::{
66 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
67};
68use crate::monitor::{
69 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
70};
71use crate::store::{
72 OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
73};
74use crate::vector::hnsw::nearest;
75use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
76
77pub type CommittedVersion = PinnedVersion;
78
79#[derive(Clone, Debug, PartialEq)]
85pub struct StagingSstableInfo {
86 sstable_infos: Vec<LocalSstableInfo>,
88 old_value_sstable_infos: Vec<LocalSstableInfo>,
89 epochs: Vec<HummockEpoch>,
92 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
94 imm_size: usize,
95}
96
97impl StagingSstableInfo {
98 pub fn new(
99 sstable_infos: Vec<LocalSstableInfo>,
100 old_value_sstable_infos: Vec<LocalSstableInfo>,
101 epochs: Vec<HummockEpoch>,
102 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
103 imm_size: usize,
104 ) -> Self {
105 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
107 Self {
108 sstable_infos,
109 old_value_sstable_infos,
110 epochs,
111 imm_ids,
112 imm_size,
113 }
114 }
115
116 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
117 &self.sstable_infos
118 }
119
120 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
121 &self.old_value_sstable_infos
122 }
123
124 pub fn imm_size(&self) -> usize {
125 self.imm_size
126 }
127
128 pub fn epochs(&self) -> &Vec<HummockEpoch> {
129 &self.epochs
130 }
131
132 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
133 &self.imm_ids
134 }
135}
136
137pub enum VersionUpdate {
138 Sst(Arc<StagingSstableInfo>),
139 CommittedSnapshot(CommittedVersion),
140 NewTableWatermark {
141 direction: WatermarkDirection,
142 epoch: HummockEpoch,
143 vnode_watermarks: Vec<VnodeWatermark>,
144 watermark_type: WatermarkSerdeType,
145 },
146}
147
148pub struct StagingVersion {
149 pending_imm_size: usize,
150 pub pending_imms: Vec<(ImmutableMemtable, MemoryTracker)>,
157 pub uploading_imms: VecDeque<ImmutableMemtable>,
163
164 pub sst: VecDeque<Arc<StagingSstableInfo>>,
166}
167
168impl StagingVersion {
169 pub fn prune_overlap<'a>(
172 &'a self,
173 max_epoch_inclusive: HummockEpoch,
174 table_id: TableId,
175 table_key_range: &'a TableKeyRange,
176 ) -> (
177 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
178 impl Iterator<Item = &'a SstableInfo> + 'a,
179 ) {
180 let (left, right) = table_key_range;
181 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
182 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
183 let overlapped_imms = self
184 .pending_imms
185 .iter()
186 .map(|(imm, _)| imm)
187 .rev() .chain(self.uploading_imms.iter())
189 .filter(move |imm| {
190 imm.min_epoch() <= max_epoch_inclusive
192 && imm.table_id == table_id
193 && range_overlap(
194 &(left, right),
195 &imm.start_table_key(),
196 Bound::Included(&imm.end_table_key()),
197 )
198 });
199
200 let overlapped_ssts = self
202 .sst
203 .iter()
204 .filter(move |staging_sst| {
205 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
206 sst_max_epoch <= max_epoch_inclusive
207 })
208 .flat_map(move |staging_sst| {
209 staging_sst
212 .sstable_infos
213 .iter()
214 .map(|sstable| &sstable.sst_info)
215 .filter(move |sstable: &&SstableInfo| {
216 filter_single_sst(sstable, table_id, table_key_range)
217 })
218 });
219 (overlapped_imms, overlapped_ssts)
220 }
221
222 pub fn is_empty(&self) -> bool {
223 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
224 }
225}
226
227pub struct HummockReadVersion {
229 table_id: TableId,
230 instance_id: LocalInstanceId,
231
232 is_initialized: bool,
233
234 staging: StagingVersion,
236
237 committed: CommittedVersion,
239
240 is_replicated: bool,
245
246 table_watermarks: Option<TableWatermarksIndex>,
247
248 vnodes: Arc<Bitmap>,
251}
252
253impl HummockReadVersion {
254 pub fn new_with_replication_option(
255 table_id: TableId,
256 instance_id: LocalInstanceId,
257 committed_version: CommittedVersion,
258 is_replicated: bool,
259 vnodes: Arc<Bitmap>,
260 ) -> Self {
261 assert!(committed_version.is_valid());
265 Self {
266 table_id,
267 instance_id,
268 table_watermarks: {
269 match committed_version.table_watermarks.get(&table_id) {
270 Some(table_watermarks) => Some(TableWatermarksIndex::new_committed(
271 table_watermarks.clone(),
272 committed_version
273 .state_table_info
274 .info()
275 .get(&table_id)
276 .expect("should exist")
277 .committed_epoch,
278 table_watermarks.watermark_type,
279 )),
280 None => None,
281 }
282 },
283 staging: StagingVersion {
284 pending_imm_size: 0,
285 pending_imms: Vec::default(),
286 uploading_imms: VecDeque::default(),
287 sst: VecDeque::default(),
288 },
289
290 committed: committed_version,
291
292 is_replicated,
293 vnodes,
294 is_initialized: false,
295 }
296 }
297
298 pub fn new(
299 table_id: TableId,
300 instance_id: LocalInstanceId,
301 committed_version: CommittedVersion,
302 vnodes: Arc<Bitmap>,
303 ) -> Self {
304 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
305 }
306
307 pub fn table_id(&self) -> TableId {
308 self.table_id
309 }
310
311 pub fn init(&mut self) {
312 assert!(!self.is_initialized);
313 self.is_initialized = true;
314 }
315
316 pub fn add_pending_imm(&mut self, imm: ImmutableMemtable, tracker: MemoryTracker) {
317 assert!(self.is_initialized);
318 assert!(!self.is_replicated);
319 if let Some(item) = self
320 .staging
321 .pending_imms
322 .last()
323 .map(|(imm, _)| imm)
324 .or_else(|| self.staging.uploading_imms.front())
325 {
326 assert!(item.batch_id() < imm.batch_id());
328 }
329
330 self.staging.pending_imm_size += imm.size();
331 self.staging.pending_imms.push((imm, tracker));
332 }
333
334 pub fn add_replicated_imm(&mut self, imm: ImmutableMemtable) {
335 assert!(self.is_initialized);
336 assert!(self.is_replicated);
337 assert!(self.staging.pending_imms.is_empty());
338 if let Some(item) = self.staging.uploading_imms.front() {
339 assert!(item.batch_id() < imm.batch_id());
341 }
342 self.staging.uploading_imms.push_front(imm);
343 }
344
345 pub fn pending_imm_size(&self) -> usize {
346 self.staging.pending_imm_size
347 }
348
349 pub fn start_upload_pending_imms(&mut self) -> Vec<(ImmutableMemtable, MemoryTracker)> {
350 assert!(self.is_initialized);
351 assert!(!self.is_replicated);
352 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
353 for (imm, _) in &pending_imms {
354 self.staging.uploading_imms.push_front(imm.clone());
355 }
356 self.staging.pending_imm_size = 0;
357 pending_imms
358 }
359
360 pub fn update(&mut self, info: VersionUpdate) {
370 match info {
371 VersionUpdate::Sst(staging_sst_ref) => {
372 {
373 assert!(!self.is_replicated);
374 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
375 warn!(
376 instance_id = self.instance_id,
377 "no related imm in sst input"
378 );
379 return;
380 };
381
382 for imm_id in imms.iter().rev() {
384 let check_err = match self.staging.uploading_imms.pop_back() {
385 None => Some("empty".to_owned()),
386 Some(prev_imm_id) => {
387 if prev_imm_id.batch_id() == *imm_id {
388 None
389 } else {
390 Some(format!(
391 "miss match id {} {}",
392 prev_imm_id.batch_id(),
393 *imm_id
394 ))
395 }
396 }
397 };
398 assert!(
399 check_err.is_none(),
400 "should be valid staging_sst.size {},
401 staging_sst.imm_ids {:?},
402 staging_sst.epochs {:?},
403 local_pending_imm_ids {:?},
404 local_uploading_imm_ids {:?},
405 instance_id {}
406 check_err {:?}",
407 staging_sst_ref.imm_size,
408 staging_sst_ref.imm_ids,
409 staging_sst_ref.epochs,
410 self.staging
411 .pending_imms
412 .iter()
413 .map(|(imm, _)| imm.batch_id())
414 .collect_vec(),
415 self.staging
416 .uploading_imms
417 .iter()
418 .map(|imm| imm.batch_id())
419 .collect_vec(),
420 self.instance_id,
421 check_err
422 );
423 }
424
425 self.staging.sst.push_front(staging_sst_ref);
426 }
427 }
428
429 VersionUpdate::CommittedSnapshot(committed_version) => {
430 if let Some(info) = committed_version
431 .state_table_info
432 .info()
433 .get(&self.table_id)
434 {
435 let committed_epoch = info.committed_epoch;
436 if self.is_replicated {
437 self.staging
438 .uploading_imms
439 .retain(|imm| imm.min_epoch() > committed_epoch);
440 self.staging
441 .pending_imms
442 .retain(|(imm, _)| imm.min_epoch() > committed_epoch);
443 } else {
444 self.staging
445 .pending_imms
446 .iter()
447 .map(|(imm, _)| imm)
448 .chain(self.staging.uploading_imms.iter())
449 .for_each(|imm| {
450 assert!(
451 imm.min_epoch() > committed_epoch,
452 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
453 imm.table_id,
454 imm.min_epoch(),
455 committed_epoch
456 )
457 });
458 }
459
460 self.staging.sst.retain(|sst| {
461 sst.epochs.first().expect("epochs not empty") > &committed_epoch
462 });
463
464 assert!(self.staging.sst.iter().all(|sst| {
466 sst.epochs.last().expect("epochs not empty") > &committed_epoch
467 }));
468
469 if let Some(committed_watermarks) =
470 committed_version.table_watermarks.get(&self.table_id)
471 {
472 if let Some(watermark_index) = &mut self.table_watermarks {
473 watermark_index.apply_committed_watermarks(
474 committed_watermarks.clone(),
475 committed_epoch,
476 );
477 } else {
478 self.table_watermarks = Some(TableWatermarksIndex::new_committed(
479 committed_watermarks.clone(),
480 committed_epoch,
481 committed_watermarks.watermark_type,
482 ));
483 }
484 }
485 }
486
487 self.committed = committed_version;
488 }
489 VersionUpdate::NewTableWatermark {
490 direction,
491 epoch,
492 vnode_watermarks,
493 watermark_type,
494 } => {
495 if let Some(watermark_index) = &mut self.table_watermarks {
496 watermark_index.add_epoch_watermark(
497 epoch,
498 Arc::from(vnode_watermarks),
499 direction,
500 );
501 } else {
502 self.table_watermarks = Some(TableWatermarksIndex::new(
503 direction,
504 epoch,
505 vnode_watermarks,
506 self.committed.table_committed_epoch(self.table_id),
507 watermark_type,
508 ));
509 }
510 }
511 }
512 }
513
514 pub fn staging(&self) -> &StagingVersion {
515 &self.staging
516 }
517
518 pub fn committed(&self) -> &CommittedVersion {
519 &self.committed
520 }
521
522 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
527 if let Some(watermark_index) = &self.table_watermarks {
528 watermark_index.filter_regress_watermarks(watermarks)
529 }
530 }
531
532 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
533 self.table_watermarks
534 .as_ref()
535 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
536 }
537
538 pub fn is_initialized(&self) -> bool {
539 self.is_initialized
540 }
541
542 pub fn is_replicated(&self) -> bool {
543 self.is_replicated
544 }
545
546 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
547 std::mem::replace(&mut self.vnodes, vnodes)
548 }
549
550 pub fn contains(&self, vnode: VirtualNode) -> bool {
551 self.vnodes.is_set(vnode.to_index())
552 }
553
554 pub fn vnodes(&self) -> Arc<Bitmap> {
555 self.vnodes.clone()
556 }
557}
558
559pub fn read_filter_for_version(
560 epoch: HummockEpoch,
561 table_id: TableId,
562 mut table_key_range: TableKeyRange,
563 read_version: &RwLock<HummockReadVersion>,
564) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
565 let read_version_guard = read_version.read();
566
567 let committed_version = read_version_guard.committed().clone();
568
569 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
570 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
571 }
572
573 let (imm_iter, sst_iter) =
574 read_version_guard
575 .staging()
576 .prune_overlap(epoch, table_id, &table_key_range);
577
578 let imms = imm_iter.cloned().collect();
579 let ssts = sst_iter.cloned().collect();
580
581 Ok((table_key_range, (imms, ssts, committed_version)))
582}
583
584#[derive(Clone)]
585pub struct HummockVersionReader {
586 sstable_store: SstableStoreRef,
587
588 state_store_metrics: Arc<HummockStateStoreMetrics>,
590 preload_retry_times: usize,
591}
592
593impl HummockVersionReader {
596 pub fn new(
597 sstable_store: SstableStoreRef,
598 state_store_metrics: Arc<HummockStateStoreMetrics>,
599 preload_retry_times: usize,
600 ) -> Self {
601 Self {
602 sstable_store,
603 state_store_metrics,
604 preload_retry_times,
605 }
606 }
607
608 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
609 &self.state_store_metrics
610 }
611}
612
613const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
614
615impl HummockVersionReader {
616 pub async fn get<'a, O>(
617 &'a self,
618 table_key: TableKey<Bytes>,
619 epoch: u64,
620 table_id: TableId,
621 table_option: TableOption,
622 read_options: ReadOptions,
623 read_version_tuple: ReadVersionTuple,
624 on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
625 ) -> StorageResult<Option<O>> {
626 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
627
628 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
629 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
630 let local_stats = &mut stats_guard.local_stats;
631 local_stats.found_key = true;
632
633 for imm in &imms {
635 if imm.max_epoch() < min_epoch {
637 continue;
638 }
639
640 local_stats.staging_imm_get_count += 1;
641
642 if let Some((data, data_epoch)) = get_from_batch(
643 imm,
644 TableKey(table_key.as_ref()),
645 epoch,
646 &read_options,
647 local_stats,
648 ) {
649 return Ok(if data_epoch.pure_epoch() < min_epoch {
650 None
651 } else {
652 data.into_user_value()
653 .map(|v| {
654 on_key_value_fn(
655 FullKey::new_with_gap_epoch(
656 table_id,
657 table_key.to_ref(),
658 data_epoch,
659 ),
660 v.as_ref(),
661 )
662 })
663 .transpose()?
664 });
665 }
666 }
667
668 let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
670 Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
671 });
672
673 let full_key = FullKey::new_with_gap_epoch(
676 table_id,
677 TableKey(table_key.clone()),
678 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
679 );
680 let single_table_key_range = table_key.clone()..=table_key.clone();
681
682 let pruned_uncommitted_ssts =
684 prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
685 for local_sst in pruned_uncommitted_ssts {
686 local_stats.staging_sst_get_count += 1;
687 if let Some(iter) = get_from_sstable_info(
688 self.sstable_store.clone(),
689 local_sst,
690 full_key.to_ref(),
691 &read_options,
692 dist_key_hash,
693 local_stats,
694 )
695 .await?
696 {
697 debug_assert!(iter.is_valid());
698 let data_epoch = iter.key().epoch_with_gap;
699 return Ok(if data_epoch.pure_epoch() < min_epoch {
700 None
701 } else {
702 iter.value()
703 .into_user_value()
704 .map(|v| {
705 on_key_value_fn(
706 FullKey::new_with_gap_epoch(
707 table_id,
708 table_key.to_ref(),
709 data_epoch,
710 ),
711 v,
712 )
713 })
714 .transpose()?
715 });
716 }
717 }
718 assert!(committed_version.is_valid());
722 for level in committed_version.levels(table_id) {
723 if level.table_infos.is_empty() {
724 continue;
725 }
726
727 match level.level_type {
728 LevelType::Overlapping | LevelType::Unspecified => {
729 let sstable_infos = prune_overlapping_ssts(
730 &level.table_infos,
731 table_id,
732 &single_table_key_range,
733 );
734 for sstable_info in sstable_infos {
735 local_stats.overlapping_get_count += 1;
736 if let Some(iter) = get_from_sstable_info(
737 self.sstable_store.clone(),
738 sstable_info,
739 full_key.to_ref(),
740 &read_options,
741 dist_key_hash,
742 local_stats,
743 )
744 .await?
745 {
746 debug_assert!(iter.is_valid());
747 let data_epoch = iter.key().epoch_with_gap;
748 return Ok(if data_epoch.pure_epoch() < min_epoch {
749 None
750 } else {
751 iter.value()
752 .into_user_value()
753 .map(|v| {
754 on_key_value_fn(
755 FullKey::new_with_gap_epoch(
756 table_id,
757 table_key.to_ref(),
758 data_epoch,
759 ),
760 v,
761 )
762 })
763 .transpose()?
764 });
765 }
766 }
767 }
768 LevelType::Nonoverlapping => {
769 let mut table_info_idx =
770 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
771 if table_info_idx == 0 {
772 continue;
773 }
774 table_info_idx = table_info_idx.saturating_sub(1);
775
776 if level.table_infos[table_info_idx]
777 .table_ids
778 .binary_search(&table_id)
779 .is_err()
780 {
781 continue;
782 }
783
784 let ord = level.table_infos[table_info_idx]
785 .key_range
786 .compare_right_with_user_key(full_key.user_key.as_ref());
787 if ord == Ordering::Less {
789 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
790 continue;
791 }
792
793 local_stats.non_overlapping_get_count += 1;
794 if let Some(iter) = get_from_sstable_info(
795 self.sstable_store.clone(),
796 &level.table_infos[table_info_idx],
797 full_key.to_ref(),
798 &read_options,
799 dist_key_hash,
800 local_stats,
801 )
802 .await?
803 {
804 debug_assert!(iter.is_valid());
805 let data_epoch = iter.key().epoch_with_gap;
806 return Ok(if data_epoch.pure_epoch() < min_epoch {
807 None
808 } else {
809 iter.value()
810 .into_user_value()
811 .map(|v| {
812 on_key_value_fn(
813 FullKey::new_with_gap_epoch(
814 table_id,
815 table_key.to_ref(),
816 data_epoch,
817 ),
818 v,
819 )
820 })
821 .transpose()?
822 });
823 }
824 }
825 }
826 }
827 stats_guard.local_stats.found_key = false;
828 Ok(None)
829 }
830
831 pub async fn iter(
832 &self,
833 table_key_range: TableKeyRange,
834 epoch: u64,
835 table_id: TableId,
836 table_option: TableOption,
837 read_options: ReadOptions,
838 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
839 ) -> StorageResult<HummockStorageIterator> {
840 self.iter_with_memtable(
841 table_key_range,
842 epoch,
843 table_id,
844 table_option,
845 read_options,
846 read_version_tuple,
847 None,
848 )
849 .await
850 }
851
852 pub async fn iter_with_memtable<'b>(
853 &self,
854 table_key_range: TableKeyRange,
855 epoch: u64,
856 table_id: TableId,
857 table_option: TableOption,
858 read_options: ReadOptions,
859 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
860 memtable_iter: Option<MemTableHummockIterator<'b>>,
861 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
862 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
863 let user_key_range = (
864 user_key_range_ref.0.map(|key| key.cloned()),
865 user_key_range_ref.1.map(|key| key.cloned()),
866 );
867 let mut factory = ForwardIteratorFactory::default();
868 let mut local_stats = StoreLocalStatistic::default();
869 let (imms, uncommitted_ssts, committed) = read_version_tuple;
870 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
871 self.iter_inner(
872 table_key_range,
873 epoch,
874 table_id,
875 read_options,
876 imms,
877 uncommitted_ssts,
878 &committed,
879 &mut local_stats,
880 &mut factory,
881 )
882 .await?;
883 let merge_iter = factory.build(memtable_iter);
884 let mut user_iter = UserIterator::new(
886 merge_iter,
887 user_key_range,
888 epoch,
889 min_epoch,
890 Some(committed),
891 );
892 user_iter.rewind().await?;
893 Ok(HummockStorageIteratorInner::new(
894 user_iter,
895 self.state_store_metrics.clone(),
896 table_id,
897 local_stats,
898 ))
899 }
900
901 pub async fn rev_iter<'b>(
902 &self,
903 table_key_range: TableKeyRange,
904 epoch: u64,
905 table_id: TableId,
906 table_option: TableOption,
907 read_options: ReadOptions,
908 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
909 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
910 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
911 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
912 let user_key_range = (
913 user_key_range_ref.0.map(|key| key.cloned()),
914 user_key_range_ref.1.map(|key| key.cloned()),
915 );
916 let mut factory = BackwardIteratorFactory::default();
917 let mut local_stats = StoreLocalStatistic::default();
918 let (imms, uncommitted_ssts, committed) = read_version_tuple;
919 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
920 self.iter_inner(
921 table_key_range,
922 epoch,
923 table_id,
924 read_options,
925 imms,
926 uncommitted_ssts,
927 &committed,
928 &mut local_stats,
929 &mut factory,
930 )
931 .await?;
932 let merge_iter = factory.build(memtable_iter);
933 let mut user_iter = BackwardUserIterator::new(
935 merge_iter,
936 user_key_range,
937 epoch,
938 min_epoch,
939 Some(committed),
940 );
941 user_iter.rewind().await?;
942 Ok(HummockStorageRevIteratorInner::new(
943 user_iter,
944 self.state_store_metrics.clone(),
945 table_id,
946 local_stats,
947 ))
948 }
949
950 async fn iter_inner<F: IteratorFactory>(
951 &self,
952 table_key_range: TableKeyRange,
953 epoch: u64,
954 table_id: TableId,
955 read_options: ReadOptions,
956 imms: Vec<ImmutableMemtable>,
957 uncommitted_ssts: Vec<SstableInfo>,
958 committed: &CommittedVersion,
959 local_stats: &mut StoreLocalStatistic,
960 factory: &mut F,
961 ) -> StorageResult<()> {
962 {
963 fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
964 match bound {
965 Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
966 Bound::Unbounded => None,
967 }
968 }
969 let (left, right) = &table_key_range;
970 if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
971 && right < left
972 {
973 if cfg!(debug_assertions) {
974 panic!("invalid iter key range: {table_id} {left:?} {right:?}")
975 } else {
976 return Err(HummockError::other(format!(
977 "invalid iter key range: {table_id} {left:?} {right:?}"
978 ))
979 .into());
980 }
981 }
982 }
983
984 local_stats.staging_imm_iter_count = imms.len() as u64;
985 for imm in imms {
986 factory.add_batch_iter(imm);
987 }
988
989 let user_key_range = bound_table_key_range(table_id, &table_key_range);
993 let user_key_range_ref = (
994 user_key_range.0.as_ref().map(UserKey::as_ref),
995 user_key_range.1.as_ref().map(UserKey::as_ref),
996 );
997 let mut staging_sst_iter_count = 0;
998 let bloom_filter_prefix_hash = read_options
1000 .prefix_hint
1001 .as_ref()
1002 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
1003 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
1004 if read_options.prefetch_options.prefetch {
1005 sst_read_options.must_iterated_end_user_key =
1006 Some(user_key_range.1.map(|key| key.cloned()));
1007 sst_read_options.max_preload_retry_times = self.preload_retry_times;
1008 }
1009 let sst_read_options = Arc::new(sst_read_options);
1010 for sstable_info in &uncommitted_ssts {
1011 let table_holder = self
1012 .sstable_store
1013 .sstable(sstable_info, local_stats)
1014 .await?;
1015
1016 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
1017 && !hit_sstable_bloom_filter(
1018 &table_holder,
1019 &user_key_range_ref,
1020 *prefix_hash,
1021 local_stats,
1022 )
1023 {
1024 continue;
1025 }
1026
1027 staging_sst_iter_count += 1;
1028 factory.add_staging_sst_iter(F::SstableIteratorType::create(
1029 table_holder,
1030 self.sstable_store.clone(),
1031 sst_read_options.clone(),
1032 sstable_info,
1033 ));
1034 }
1035 local_stats.staging_sst_iter_count = staging_sst_iter_count;
1036
1037 let timer = Instant::now();
1038
1039 for level in committed.levels(table_id) {
1040 if level.table_infos.is_empty() {
1041 continue;
1042 }
1043
1044 if level.level_type == LevelType::Nonoverlapping {
1045 let mut table_infos =
1046 prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1047 .peekable();
1048
1049 if table_infos.peek().is_none() {
1050 continue;
1051 }
1052 let sstable_infos = table_infos.cloned().collect_vec();
1053 if sstable_infos.len() > 1 {
1054 factory.add_concat_sst_iter(
1055 sstable_infos,
1056 self.sstable_store.clone(),
1057 sst_read_options.clone(),
1058 );
1059 local_stats.non_overlapping_iter_count += 1;
1060 } else {
1061 let sstable = self
1062 .sstable_store
1063 .sstable(&sstable_infos[0], local_stats)
1064 .await?;
1065
1066 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1067 && !hit_sstable_bloom_filter(
1068 &sstable,
1069 &user_key_range_ref,
1070 *dist_hash,
1071 local_stats,
1072 )
1073 {
1074 continue;
1075 }
1076 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1082 sstable,
1083 self.sstable_store.clone(),
1084 sst_read_options.clone(),
1085 &sstable_infos[0],
1086 ));
1087 local_stats.non_overlapping_iter_count += 1;
1088 }
1089 } else {
1090 let table_infos =
1091 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1092 let fetch_meta_req = table_infos.rev().collect_vec();
1094 if fetch_meta_req.is_empty() {
1095 continue;
1096 }
1097 for sstable_info in fetch_meta_req {
1098 let sstable = self
1099 .sstable_store
1100 .sstable(sstable_info, local_stats)
1101 .await?;
1102 assert_eq!(sstable_info.object_id, sstable.id);
1103 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1104 && !hit_sstable_bloom_filter(
1105 &sstable,
1106 &user_key_range_ref,
1107 *dist_hash,
1108 local_stats,
1109 )
1110 {
1111 continue;
1112 }
1113 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1114 sstable,
1115 self.sstable_store.clone(),
1116 sst_read_options.clone(),
1117 sstable_info,
1118 ));
1119 local_stats.overlapping_iter_count += 1;
1120 }
1121 }
1122 }
1123 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1124 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1125 let table_id_string = table_id.to_string();
1126 tracing::warn!(
1127 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1128 table_id_string,
1129 epoch,
1130 fetch_meta_duration_sec,
1131 local_stats.cache_meta_block_miss
1132 );
1133 self.state_store_metrics
1134 .iter_slow_fetch_meta_cache_unhits
1135 .set(local_stats.cache_meta_block_miss as i64);
1136 }
1137 Ok(())
1138 }
1139
1140 pub async fn iter_log(
1141 &self,
1142 version: PinnedVersion,
1143 epoch_range: (u64, u64),
1144 key_range: TableKeyRange,
1145 options: ReadLogOptions,
1146 ) -> HummockResult<ChangeLogIterator> {
1147 let change_log = {
1148 let table_change_logs = version.table_change_log_read_lock();
1149 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1150 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1151 } else {
1152 Vec::new()
1153 }
1154 };
1155
1156 if let Some(max_epoch_change_log) = change_log.last() {
1157 let (_, max_epoch) = epoch_range;
1158 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1159 warn!(
1160 max_epoch,
1161 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1162 table_id = %options.table_id,
1163 "max_epoch does not exist"
1164 );
1165 }
1166 }
1167 let read_options = Arc::new(SstableIteratorReadOptions {
1168 cache_policy: Default::default(),
1169 must_iterated_end_user_key: None,
1170 max_preload_retry_times: 0,
1171 prefetch_for_large_query: false,
1172 });
1173
1174 async fn make_iter(
1175 sstable_infos: impl Iterator<Item = &SstableInfo>,
1176 sstable_store: &SstableStoreRef,
1177 read_options: Arc<SstableIteratorReadOptions>,
1178 local_stat: &mut StoreLocalStatistic,
1179 ) -> HummockResult<MergeIterator<SstableIterator>> {
1180 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1181 let sstable_store = sstable_store.clone();
1182 let read_options = read_options.clone();
1183 async move {
1184 let mut local_stat = StoreLocalStatistic::default();
1185 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1186 Ok::<_, HummockError>((
1187 SstableIterator::new(
1188 table_holder,
1189 sstable_store,
1190 read_options,
1191 sstable_info,
1192 ),
1193 local_stat,
1194 ))
1195 }
1196 }))
1197 .await?;
1198 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1199 |(iter, stats)| {
1200 local_stat.add(&stats);
1201 iter
1202 },
1203 )))
1204 }
1205
1206 let mut local_stat = StoreLocalStatistic::default();
1207
1208 let new_value_iter = make_iter(
1209 change_log
1210 .iter()
1211 .flat_map(|log| log.new_value.iter())
1212 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1213 &self.sstable_store,
1214 read_options.clone(),
1215 &mut local_stat,
1216 )
1217 .await?;
1218 let old_value_iter = make_iter(
1219 change_log
1220 .iter()
1221 .flat_map(|log| log.old_value.iter())
1222 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1223 &self.sstable_store,
1224 read_options.clone(),
1225 &mut local_stat,
1226 )
1227 .await?;
1228 ChangeLogIterator::new(
1229 epoch_range,
1230 key_range,
1231 new_value_iter,
1232 old_value_iter,
1233 options.table_id,
1234 IterLocalMetricsGuard::new(
1235 self.state_store_metrics.clone(),
1236 options.table_id,
1237 local_stat,
1238 ),
1239 )
1240 .await
1241 }
1242
1243 pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1244 &'a self,
1245 version: PinnedVersion,
1246 table_id: TableId,
1247 target: VectorRef<'a>,
1248 options: VectorNearestOptions,
1249 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1250 ) -> HummockResult<Vec<O>> {
1251 let Some(index) = version.vector_indexes.get(&table_id) else {
1252 return Ok(vec![]);
1253 };
1254 if target.dimension() != index.dimension {
1255 return Err(HummockError::other(format!(
1256 "target dimension {} not match index dimension {}",
1257 target.dimension(),
1258 index.dimension
1259 )));
1260 }
1261 match &index.inner {
1262 VectorIndexImpl::Flat(flat) => {
1263 let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1264 let mut cache_stat = VectorStoreCacheStats::default();
1265 for vector_file in &flat.vector_store_info.vector_files {
1266 let meta = self
1267 .sstable_store
1268 .get_vector_file_meta(vector_file, &mut cache_stat)
1269 .await?;
1270 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1271 let block = self
1272 .sstable_store
1273 .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1274 .await?;
1275 builder.add(&**block, &on_nearest_item_fn);
1276 }
1277 }
1278 cache_stat.report(table_id, "flat", self.stats());
1279 Ok(builder.finish())
1280 }
1281 VectorIndexImpl::HnswFlat(hnsw_flat) => {
1282 let Some(graph_file) = &hnsw_flat.graph_file else {
1283 return Ok(vec![]);
1284 };
1285
1286 let mut ctx = FileVectorStoreCtx::default();
1287
1288 let graph = self
1289 .sstable_store
1290 .get_hnsw_graph(graph_file, &mut ctx.stats)
1291 .await?;
1292
1293 let vector_store =
1294 FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1295 let (items, stats) = nearest::<O, M, _>(
1296 &vector_store,
1297 &mut ctx,
1298 &*graph,
1299 target,
1300 on_nearest_item_fn,
1301 options.hnsw_ef_search,
1302 options.top_n,
1303 )
1304 .await?;
1305 ctx.stats.report(table_id, "hnsw_read", self.stats());
1306 report_hnsw_stat(
1307 self.stats(),
1308 table_id,
1309 "hnsw_read",
1310 options.top_n,
1311 options.hnsw_ef_search,
1312 [stats],
1313 );
1314 Ok(items)
1315 }
1316 }
1317 }
1318}
1319
1320#[cfg(test)]
1321mod tests {
1322 use std::collections::{HashMap, HashSet};
1323 use std::sync::Arc;
1324
1325 use bytes::Bytes;
1326 use risingwave_common::catalog::{TableId, TableOption};
1327 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
1328 use risingwave_hummock_sdk::key::{FullKey, TableKey};
1329 use risingwave_hummock_sdk::key_range::KeyRange;
1330 use risingwave_hummock_sdk::level::{Level, Levels, OverlappingLevel};
1331 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
1332 use risingwave_hummock_sdk::version::HummockVersion;
1333 use risingwave_pb::hummock::{PbHummockVersion, PbLevelType, StateTableInfoDelta};
1334 use tokio::sync::mpsc::unbounded_channel;
1335
1336 use crate::hummock::iterator::test_utils::mock_sstable_store;
1337 use crate::hummock::local_version::pinned_version::PinnedVersion;
1338 use crate::hummock::store::version::HummockVersionReader;
1339 use crate::monitor::HummockStateStoreMetrics;
1340 use crate::store::ReadOptions;
1341
1342 #[tokio::test]
1346 async fn test_get_skips_sst_by_table_id_filter() {
1347 let query_table_id = TableId::new(100);
1348 let epoch: u64 = (31 * 1000) << 16;
1349 let compaction_group_id = StaticCompactionGroupId::StateDefault;
1350
1351 let sst_info = SstableInfoInner {
1353 sst_id: 1.into(),
1354 object_id: 1.into(),
1355 key_range: KeyRange {
1356 left: Bytes::from(
1357 FullKey::for_test(TableId::new(50), b"aaa".to_vec(), epoch).encode(),
1358 ),
1359 right: Bytes::from(
1360 FullKey::for_test(TableId::new(150), b"zzz".to_vec(), epoch).encode(),
1361 ),
1362 right_exclusive: false,
1363 },
1364 table_ids: vec![TableId::new(50), TableId::new(150)],
1365 file_size: 1024,
1366 ..Default::default()
1367 }
1368 .into();
1369
1370 let level = Level {
1371 level_idx: 1,
1372 level_type: PbLevelType::Nonoverlapping,
1373 table_infos: vec![sst_info],
1374 total_file_size: 0,
1375 sub_level_id: 0,
1376 uncompressed_file_size: 0,
1377 vnode_partition_count: 0,
1378 };
1379
1380 #[allow(deprecated)]
1381 let levels = Levels {
1382 levels: vec![level],
1383 l0: OverlappingLevel::default(),
1384 group_id: compaction_group_id,
1385 parent_group_id: compaction_group_id,
1386 member_table_ids: vec![],
1387 compaction_group_version_id: 0,
1388 };
1389
1390 let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion {
1391 id: 1u64.into(),
1392 ..Default::default()
1393 });
1394 version.levels.insert(compaction_group_id, levels);
1395 version.state_table_info.apply_delta(
1396 &HashMap::from([(
1397 query_table_id,
1398 StateTableInfoDelta {
1399 committed_epoch: epoch,
1400 compaction_group_id,
1401 },
1402 )]),
1403 &HashSet::new(),
1404 );
1405
1406 let pinned_version = PinnedVersion::new(version, unbounded_channel().0);
1407 let reader = HummockVersionReader::new(
1408 mock_sstable_store().await,
1409 Arc::new(HummockStateStoreMetrics::unused()),
1410 0,
1411 );
1412
1413 let result = reader
1414 .get(
1415 TableKey(Bytes::from("test_key")),
1416 epoch,
1417 query_table_id,
1418 TableOption::default(),
1419 ReadOptions::default(),
1420 (vec![], vec![], pinned_version),
1421 |_key, _value| Ok(()),
1422 )
1423 .await
1424 .unwrap();
1425
1426 assert!(result.is_none());
1427 }
1428}