1use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::Included;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
39use risingwave_pb::hummock::LevelType;
40use sync_point::sync_point;
41use tracing::warn;
42
43use crate::error::StorageResult;
44use crate::hummock::event_handler::LocalInstanceId;
45use crate::hummock::iterator::change_log::ChangeLogIterator;
46use crate::hummock::iterator::{
47 BackwardUserIterator, IteratorFactory, MergeIterator, UserIterator,
48};
49use crate::hummock::local_version::pinned_version::PinnedVersion;
50use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
51use crate::hummock::sstable_store::SstableStoreRef;
52use crate::hummock::utils::{
53 filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
54 search_sst_idx,
55};
56use crate::hummock::{
57 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
58 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
59 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
60 hit_sstable_bloom_filter,
61};
62use crate::mem_table::{
63 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
64};
65use crate::monitor::{
66 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
67};
68use crate::store::{ReadLogOptions, ReadOptions, StateStoreKeyedRow, gen_min_epoch};
69
70pub type CommittedVersion = PinnedVersion;
71
72#[derive(Clone, Debug, PartialEq)]
78pub struct StagingSstableInfo {
79 sstable_infos: Vec<LocalSstableInfo>,
81 old_value_sstable_infos: Vec<LocalSstableInfo>,
82 epochs: Vec<HummockEpoch>,
85 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
87 imm_size: usize,
88}
89
90impl StagingSstableInfo {
91 pub fn new(
92 sstable_infos: Vec<LocalSstableInfo>,
93 old_value_sstable_infos: Vec<LocalSstableInfo>,
94 epochs: Vec<HummockEpoch>,
95 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
96 imm_size: usize,
97 ) -> Self {
98 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
100 Self {
101 sstable_infos,
102 old_value_sstable_infos,
103 epochs,
104 imm_ids,
105 imm_size,
106 }
107 }
108
109 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
110 &self.sstable_infos
111 }
112
113 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
114 &self.old_value_sstable_infos
115 }
116
117 pub fn imm_size(&self) -> usize {
118 self.imm_size
119 }
120
121 pub fn epochs(&self) -> &Vec<HummockEpoch> {
122 &self.epochs
123 }
124
125 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
126 &self.imm_ids
127 }
128}
129
130#[derive(Clone)]
131pub enum StagingData {
132 ImmMem(ImmutableMemtable),
133 Sst(Arc<StagingSstableInfo>),
134}
135
136pub enum VersionUpdate {
137 Staging(StagingData),
139 CommittedSnapshot(CommittedVersion),
140 NewTableWatermark {
141 direction: WatermarkDirection,
142 epoch: HummockEpoch,
143 vnode_watermarks: Vec<VnodeWatermark>,
144 watermark_type: WatermarkSerdeType,
145 },
146}
147
148#[derive(Clone)]
149pub struct StagingVersion {
150 pub imm: VecDeque<ImmutableMemtable>,
154
155 pub sst: VecDeque<Arc<StagingSstableInfo>>,
157}
158
159impl StagingVersion {
160 pub fn prune_overlap<'a>(
163 &'a self,
164 max_epoch_inclusive: HummockEpoch,
165 table_id: TableId,
166 table_key_range: &'a TableKeyRange,
167 ) -> (
168 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
169 impl Iterator<Item = &'a SstableInfo> + 'a,
170 ) {
171 let (left, right) = table_key_range;
172 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
173 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
174 let overlapped_imms = self.imm.iter().filter(move |imm| {
175 imm.min_epoch() <= max_epoch_inclusive
177 && imm.table_id == table_id
178 && range_overlap(
179 &(left, right),
180 &imm.start_table_key(),
181 Included(&imm.end_table_key()),
182 )
183 });
184
185 let overlapped_ssts = self
187 .sst
188 .iter()
189 .filter(move |staging_sst| {
190 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
191 sst_max_epoch <= max_epoch_inclusive
192 })
193 .flat_map(move |staging_sst| {
194 staging_sst
197 .sstable_infos
198 .iter()
199 .map(|sstable| &sstable.sst_info)
200 .filter(move |sstable: &&SstableInfo| {
201 filter_single_sst(sstable, table_id, table_key_range)
202 })
203 });
204 (overlapped_imms, overlapped_ssts)
205 }
206
207 pub fn is_empty(&self) -> bool {
208 self.imm.is_empty() && self.sst.is_empty()
209 }
210}
211
212#[derive(Clone)]
213pub struct HummockReadVersion {
215 table_id: TableId,
216 instance_id: LocalInstanceId,
217
218 staging: StagingVersion,
220
221 committed: CommittedVersion,
223
224 is_replicated: bool,
229
230 table_watermarks: Option<TableWatermarksIndex>,
231
232 vnodes: Arc<Bitmap>,
235}
236
237impl HummockReadVersion {
238 pub fn new_with_replication_option(
239 table_id: TableId,
240 instance_id: LocalInstanceId,
241 committed_version: CommittedVersion,
242 is_replicated: bool,
243 vnodes: Arc<Bitmap>,
244 ) -> Self {
245 assert!(committed_version.is_valid());
249 Self {
250 table_id,
251 instance_id,
252 table_watermarks: {
253 match committed_version.table_watermarks.get(&table_id) {
254 Some(table_watermarks) => match table_watermarks.watermark_type {
255 WatermarkSerdeType::PkPrefix => Some(TableWatermarksIndex::new_committed(
256 table_watermarks.clone(),
257 committed_version
258 .state_table_info
259 .info()
260 .get(&table_id)
261 .expect("should exist")
262 .committed_epoch,
263 )),
264
265 WatermarkSerdeType::NonPkPrefix => None, },
267 None => None,
268 }
269 },
270 staging: StagingVersion {
271 imm: VecDeque::default(),
272 sst: VecDeque::default(),
273 },
274
275 committed: committed_version,
276
277 is_replicated,
278 vnodes,
279 }
280 }
281
282 pub fn new(
283 table_id: TableId,
284 instance_id: LocalInstanceId,
285 committed_version: CommittedVersion,
286 vnodes: Arc<Bitmap>,
287 ) -> Self {
288 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
289 }
290
291 pub fn table_id(&self) -> TableId {
292 self.table_id
293 }
294
295 pub fn update(&mut self, info: VersionUpdate) {
305 match info {
306 VersionUpdate::Staging(staging) => match staging {
307 StagingData::ImmMem(imm) => {
310 if let Some(item) = self.staging.imm.front() {
311 debug_assert!(item.batch_id() < imm.batch_id());
313 }
314
315 self.staging.imm.push_front(imm)
316 }
317 StagingData::Sst(staging_sst_ref) => {
318 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
319 warn!(
320 instance_id = self.instance_id,
321 "no related imm in sst input"
322 );
323 return;
324 };
325
326 for imm_id in imms.iter().rev() {
328 let check_err = match self.staging.imm.pop_back() {
329 None => Some("empty".to_owned()),
330 Some(prev_imm_id) => {
331 if prev_imm_id.batch_id() == *imm_id {
332 None
333 } else {
334 Some(format!(
335 "miss match id {} {}",
336 prev_imm_id.batch_id(),
337 *imm_id
338 ))
339 }
340 }
341 };
342 assert!(
343 check_err.is_none(),
344 "should be valid staging_sst.size {},
345 staging_sst.imm_ids {:?},
346 staging_sst.epochs {:?},
347 local_imm_ids {:?},
348 instance_id {}
349 check_err {:?}",
350 staging_sst_ref.imm_size,
351 staging_sst_ref.imm_ids,
352 staging_sst_ref.epochs,
353 self.staging
354 .imm
355 .iter()
356 .map(|imm| imm.batch_id())
357 .collect_vec(),
358 self.instance_id,
359 check_err
360 );
361 }
362
363 self.staging.sst.push_front(staging_sst_ref);
364 }
365 },
366
367 VersionUpdate::CommittedSnapshot(committed_version) => {
368 if let Some(info) = committed_version
369 .state_table_info
370 .info()
371 .get(&self.table_id)
372 {
373 let committed_epoch = info.committed_epoch;
374 self.staging.imm.retain(|imm| {
375 if self.is_replicated {
376 imm.min_epoch() > committed_epoch
377 } else {
378 assert!(imm.min_epoch() > committed_epoch);
379 true
380 }
381 });
382
383 self.staging.sst.retain(|sst| {
384 sst.epochs.first().expect("epochs not empty") > &committed_epoch
385 });
386
387 assert!(self.staging.sst.iter().all(|sst| {
389 sst.epochs.last().expect("epochs not empty") > &committed_epoch
390 }));
391
392 if let Some(committed_watermarks) =
393 self.committed.table_watermarks.get(&self.table_id)
394 {
395 if let Some(watermark_index) = &mut self.table_watermarks {
396 watermark_index.apply_committed_watermarks(
397 committed_watermarks.clone(),
398 committed_epoch,
399 );
400 } else {
401 self.table_watermarks = Some(TableWatermarksIndex::new_committed(
402 committed_watermarks.clone(),
403 committed_epoch,
404 ));
405 }
406 }
407 }
408
409 self.committed = committed_version;
410 }
411 VersionUpdate::NewTableWatermark {
412 direction,
413 epoch,
414 vnode_watermarks,
415 watermark_type,
416 } => {
417 assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
418 if let Some(watermark_index) = &mut self.table_watermarks {
419 watermark_index.add_epoch_watermark(
420 epoch,
421 Arc::from(vnode_watermarks),
422 direction,
423 );
424 } else {
425 self.table_watermarks = Some(TableWatermarksIndex::new(
426 direction,
427 epoch,
428 vnode_watermarks,
429 self.committed.table_committed_epoch(self.table_id),
430 ));
431 }
432 }
433 }
434 }
435
436 pub fn staging(&self) -> &StagingVersion {
437 &self.staging
438 }
439
440 pub fn committed(&self) -> &CommittedVersion {
441 &self.committed
442 }
443
444 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
449 if let Some(watermark_index) = &self.table_watermarks {
450 watermark_index.filter_regress_watermarks(watermarks)
451 }
452 }
453
454 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
455 self.table_watermarks
456 .as_ref()
457 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
458 }
459
460 pub fn is_replicated(&self) -> bool {
461 self.is_replicated
462 }
463
464 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
465 std::mem::replace(&mut self.vnodes, vnodes)
466 }
467
468 pub fn contains(&self, vnode: VirtualNode) -> bool {
469 self.vnodes.is_set(vnode.to_index())
470 }
471
472 pub fn vnodes(&self) -> Arc<Bitmap> {
473 self.vnodes.clone()
474 }
475}
476
477pub fn read_filter_for_version(
478 epoch: HummockEpoch,
479 table_id: TableId,
480 mut table_key_range: TableKeyRange,
481 read_version: &RwLock<HummockReadVersion>,
482) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
483 let read_version_guard = read_version.read();
484
485 let committed_version = read_version_guard.committed().clone();
486
487 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
488 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
489 }
490
491 let (imm_iter, sst_iter) =
492 read_version_guard
493 .staging()
494 .prune_overlap(epoch, table_id, &table_key_range);
495
496 let imms = imm_iter.cloned().collect();
497 let ssts = sst_iter.cloned().collect();
498
499 Ok((table_key_range, (imms, ssts, committed_version)))
500}
501
502#[derive(Clone)]
503pub struct HummockVersionReader {
504 sstable_store: SstableStoreRef,
505
506 state_store_metrics: Arc<HummockStateStoreMetrics>,
508 preload_retry_times: usize,
509}
510
511impl HummockVersionReader {
514 pub fn new(
515 sstable_store: SstableStoreRef,
516 state_store_metrics: Arc<HummockStateStoreMetrics>,
517 preload_retry_times: usize,
518 ) -> Self {
519 Self {
520 sstable_store,
521 state_store_metrics,
522 preload_retry_times,
523 }
524 }
525
526 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
527 &self.state_store_metrics
528 }
529}
530
531const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
532
533impl HummockVersionReader {
534 pub async fn get(
535 &self,
536 table_key: TableKey<Bytes>,
537 epoch: u64,
538 read_options: ReadOptions,
539 read_version_tuple: ReadVersionTuple,
540 ) -> StorageResult<Option<StateStoreKeyedRow>> {
541 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
542
543 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
544 let mut stats_guard =
545 GetLocalMetricsGuard::new(self.state_store_metrics.clone(), read_options.table_id);
546 let local_stats = &mut stats_guard.local_stats;
547 local_stats.found_key = true;
548
549 for imm in &imms {
551 if imm.max_epoch() < min_epoch {
553 continue;
554 }
555
556 local_stats.staging_imm_get_count += 1;
557
558 if let Some((data, data_epoch)) = get_from_batch(
559 imm,
560 TableKey(table_key.as_ref()),
561 epoch,
562 &read_options,
563 local_stats,
564 ) {
565 return Ok(if data_epoch.pure_epoch() < min_epoch {
566 None
567 } else {
568 data.into_user_value().map(|v| {
569 (
570 FullKey::new_with_gap_epoch(
571 read_options.table_id,
572 table_key.clone(),
573 data_epoch,
574 ),
575 v,
576 )
577 })
578 });
579 }
580 }
581
582 let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
584 Sstable::hash_for_bloom_filter(dist_key.as_ref(), read_options.table_id.table_id())
585 });
586
587 let full_key = FullKey::new_with_gap_epoch(
590 read_options.table_id,
591 TableKey(table_key.clone()),
592 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
593 );
594 for local_sst in &uncommitted_ssts {
595 local_stats.staging_sst_get_count += 1;
596 if let Some((data, data_epoch)) = get_from_sstable_info(
597 self.sstable_store.clone(),
598 local_sst,
599 full_key.to_ref(),
600 &read_options,
601 dist_key_hash,
602 local_stats,
603 )
604 .await?
605 {
606 return Ok(if data_epoch.pure_epoch() < min_epoch {
607 None
608 } else {
609 data.into_user_value().map(|v| {
610 (
611 FullKey::new_with_gap_epoch(
612 read_options.table_id,
613 table_key.clone(),
614 data_epoch,
615 ),
616 v,
617 )
618 })
619 });
620 }
621 }
622 let single_table_key_range = table_key.clone()..=table_key.clone();
623 assert!(committed_version.is_valid());
627 for level in committed_version.levels(read_options.table_id) {
628 if level.table_infos.is_empty() {
629 continue;
630 }
631
632 match level.level_type {
633 LevelType::Overlapping | LevelType::Unspecified => {
634 let sstable_infos = prune_overlapping_ssts(
635 &level.table_infos,
636 read_options.table_id,
637 &single_table_key_range,
638 );
639 for sstable_info in sstable_infos {
640 local_stats.overlapping_get_count += 1;
641 if let Some((data, data_epoch)) = get_from_sstable_info(
642 self.sstable_store.clone(),
643 sstable_info,
644 full_key.to_ref(),
645 &read_options,
646 dist_key_hash,
647 local_stats,
648 )
649 .await?
650 {
651 return Ok(if data_epoch.pure_epoch() < min_epoch {
652 None
653 } else {
654 data.into_user_value().map(|v| {
655 (
656 FullKey::new_with_gap_epoch(
657 read_options.table_id,
658 table_key.clone(),
659 data_epoch,
660 ),
661 v,
662 )
663 })
664 });
665 }
666 }
667 }
668 LevelType::Nonoverlapping => {
669 let mut table_info_idx =
670 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
671 if table_info_idx == 0 {
672 continue;
673 }
674 table_info_idx = table_info_idx.saturating_sub(1);
675 let ord = level.table_infos[table_info_idx]
676 .key_range
677 .compare_right_with_user_key(full_key.user_key.as_ref());
678 if ord == Ordering::Less {
680 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
681 continue;
682 }
683
684 local_stats.non_overlapping_get_count += 1;
685 if let Some((data, data_epoch)) = get_from_sstable_info(
686 self.sstable_store.clone(),
687 &level.table_infos[table_info_idx],
688 full_key.to_ref(),
689 &read_options,
690 dist_key_hash,
691 local_stats,
692 )
693 .await?
694 {
695 return Ok(if data_epoch.pure_epoch() < min_epoch {
696 None
697 } else {
698 data.into_user_value().map(|v| {
699 (
700 FullKey::new_with_gap_epoch(
701 read_options.table_id,
702 table_key.clone(),
703 data_epoch,
704 ),
705 v,
706 )
707 })
708 });
709 }
710 }
711 }
712 }
713 stats_guard.local_stats.found_key = false;
714 Ok(None)
715 }
716
717 pub async fn iter(
718 &self,
719 table_key_range: TableKeyRange,
720 epoch: u64,
721 read_options: ReadOptions,
722 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
723 ) -> StorageResult<HummockStorageIterator> {
724 self.iter_with_memtable(
725 table_key_range,
726 epoch,
727 read_options,
728 read_version_tuple,
729 None,
730 )
731 .await
732 }
733
734 pub async fn iter_with_memtable<'b>(
735 &self,
736 table_key_range: TableKeyRange,
737 epoch: u64,
738 read_options: ReadOptions,
739 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
740 memtable_iter: Option<MemTableHummockIterator<'b>>,
741 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
742 let user_key_range_ref = bound_table_key_range(read_options.table_id, &table_key_range);
743 let user_key_range = (
744 user_key_range_ref.0.map(|key| key.cloned()),
745 user_key_range_ref.1.map(|key| key.cloned()),
746 );
747 let mut factory = ForwardIteratorFactory::default();
748 let mut local_stats = StoreLocalStatistic::default();
749 let (imms, uncommitted_ssts, committed) = read_version_tuple;
750 let table_id = read_options.table_id;
751 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
752 self.iter_inner(
753 table_key_range,
754 epoch,
755 read_options,
756 imms,
757 uncommitted_ssts,
758 &committed,
759 &mut local_stats,
760 &mut factory,
761 )
762 .await?;
763 let merge_iter = factory.build(memtable_iter);
764 let mut user_iter = UserIterator::new(
766 merge_iter,
767 user_key_range,
768 epoch,
769 min_epoch,
770 Some(committed),
771 );
772 user_iter.rewind().await?;
773 Ok(HummockStorageIteratorInner::new(
774 user_iter,
775 self.state_store_metrics.clone(),
776 table_id,
777 local_stats,
778 ))
779 }
780
781 pub async fn rev_iter<'b>(
782 &self,
783 table_key_range: TableKeyRange,
784 epoch: u64,
785 read_options: ReadOptions,
786 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
787 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
788 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
789 let user_key_range_ref = bound_table_key_range(read_options.table_id, &table_key_range);
790 let user_key_range = (
791 user_key_range_ref.0.map(|key| key.cloned()),
792 user_key_range_ref.1.map(|key| key.cloned()),
793 );
794 let mut factory = BackwardIteratorFactory::default();
795 let mut local_stats = StoreLocalStatistic::default();
796 let (imms, uncommitted_ssts, committed) = read_version_tuple;
797 let table_id = read_options.table_id;
798 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
799 self.iter_inner(
800 table_key_range,
801 epoch,
802 read_options,
803 imms,
804 uncommitted_ssts,
805 &committed,
806 &mut local_stats,
807 &mut factory,
808 )
809 .await?;
810 let merge_iter = factory.build(memtable_iter);
811 let mut user_iter = BackwardUserIterator::new(
813 merge_iter,
814 user_key_range,
815 epoch,
816 min_epoch,
817 Some(committed),
818 );
819 user_iter.rewind().await?;
820 Ok(HummockStorageRevIteratorInner::new(
821 user_iter,
822 self.state_store_metrics.clone(),
823 table_id,
824 local_stats,
825 ))
826 }
827
828 pub async fn iter_inner<F: IteratorFactory>(
829 &self,
830 table_key_range: TableKeyRange,
831 epoch: u64,
832 read_options: ReadOptions,
833 imms: Vec<ImmutableMemtable>,
834 uncommitted_ssts: Vec<SstableInfo>,
835 committed: &CommittedVersion,
836 local_stats: &mut StoreLocalStatistic,
837 factory: &mut F,
838 ) -> StorageResult<()> {
839 local_stats.staging_imm_iter_count = imms.len() as u64;
840 for imm in imms {
841 factory.add_batch_iter(imm);
842 }
843
844 let user_key_range = bound_table_key_range(read_options.table_id, &table_key_range);
848 let user_key_range_ref = (
849 user_key_range.0.as_ref().map(UserKey::as_ref),
850 user_key_range.1.as_ref().map(UserKey::as_ref),
851 );
852 let mut staging_sst_iter_count = 0;
853 let bloom_filter_prefix_hash = read_options
855 .prefix_hint
856 .as_ref()
857 .map(|hint| Sstable::hash_for_bloom_filter(hint, read_options.table_id.table_id()));
858 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
859 if read_options.prefetch_options.prefetch {
860 sst_read_options.must_iterated_end_user_key =
861 Some(user_key_range.1.map(|key| key.cloned()));
862 sst_read_options.max_preload_retry_times = self.preload_retry_times;
863 }
864 let sst_read_options = Arc::new(sst_read_options);
865 for sstable_info in &uncommitted_ssts {
866 let table_holder = self
867 .sstable_store
868 .sstable(sstable_info, local_stats)
869 .await?;
870
871 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
872 if !hit_sstable_bloom_filter(
873 &table_holder,
874 &user_key_range_ref,
875 *prefix_hash,
876 local_stats,
877 ) {
878 continue;
879 }
880 }
881
882 staging_sst_iter_count += 1;
883 factory.add_staging_sst_iter(F::SstableIteratorType::create(
884 table_holder,
885 self.sstable_store.clone(),
886 sst_read_options.clone(),
887 sstable_info,
888 ));
889 }
890 local_stats.staging_sst_iter_count = staging_sst_iter_count;
891
892 let timer = Instant::now();
893
894 for level in committed.levels(read_options.table_id) {
895 if level.table_infos.is_empty() {
896 continue;
897 }
898
899 if level.level_type == LevelType::Nonoverlapping {
900 let mut table_infos = prune_nonoverlapping_ssts(
901 &level.table_infos,
902 user_key_range_ref,
903 read_options.table_id.table_id(),
904 )
905 .peekable();
906
907 if table_infos.peek().is_none() {
908 continue;
909 }
910 let sstable_infos = table_infos.cloned().collect_vec();
911 if sstable_infos.len() > 1 {
912 factory.add_concat_sst_iter(
913 sstable_infos,
914 self.sstable_store.clone(),
915 sst_read_options.clone(),
916 );
917 local_stats.non_overlapping_iter_count += 1;
918 } else {
919 let sstable = self
920 .sstable_store
921 .sstable(&sstable_infos[0], local_stats)
922 .await?;
923
924 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
925 if !hit_sstable_bloom_filter(
926 &sstable,
927 &user_key_range_ref,
928 *dist_hash,
929 local_stats,
930 ) {
931 continue;
932 }
933 }
934 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
940 sstable,
941 self.sstable_store.clone(),
942 sst_read_options.clone(),
943 &sstable_infos[0],
944 ));
945 local_stats.non_overlapping_iter_count += 1;
946 }
947 } else {
948 let table_infos = prune_overlapping_ssts(
949 &level.table_infos,
950 read_options.table_id,
951 &table_key_range,
952 );
953 let fetch_meta_req = table_infos.rev().collect_vec();
955 if fetch_meta_req.is_empty() {
956 continue;
957 }
958 for sstable_info in fetch_meta_req {
959 let sstable = self
960 .sstable_store
961 .sstable(sstable_info, local_stats)
962 .await?;
963 assert_eq!(sstable_info.object_id, sstable.id);
964 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
965 if !hit_sstable_bloom_filter(
966 &sstable,
967 &user_key_range_ref,
968 *dist_hash,
969 local_stats,
970 ) {
971 continue;
972 }
973 }
974 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
975 sstable,
976 self.sstable_store.clone(),
977 sst_read_options.clone(),
978 sstable_info,
979 ));
980 local_stats.overlapping_iter_count += 1;
981 }
982 }
983 }
984 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
985 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
986 let table_id_string = read_options.table_id.to_string();
987 tracing::warn!(
988 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
989 table_id_string,
990 epoch,
991 fetch_meta_duration_sec,
992 local_stats.cache_meta_block_miss
993 );
994 self.state_store_metrics
995 .iter_slow_fetch_meta_cache_unhits
996 .set(local_stats.cache_meta_block_miss as i64);
997 }
998 Ok(())
999 }
1000
1001 pub async fn iter_log(
1002 &self,
1003 version: PinnedVersion,
1004 epoch_range: (u64, u64),
1005 key_range: TableKeyRange,
1006 options: ReadLogOptions,
1007 ) -> HummockResult<ChangeLogIterator> {
1008 let change_log = {
1009 let table_change_logs = version.table_change_log_read_lock();
1010 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1011 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1012 } else {
1013 Vec::new()
1014 }
1015 };
1016
1017 if let Some(max_epoch_change_log) = change_log.last() {
1018 let (_, max_epoch) = epoch_range;
1019 if !max_epoch_change_log.epochs.contains(&max_epoch) {
1020 warn!(
1021 max_epoch,
1022 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs.iter()).collect_vec(),
1023 table_id = options.table_id.table_id,
1024 "max_epoch does not exist"
1025 );
1026 }
1027 }
1028 let read_options = Arc::new(SstableIteratorReadOptions {
1029 cache_policy: Default::default(),
1030 must_iterated_end_user_key: None,
1031 max_preload_retry_times: 0,
1032 prefetch_for_large_query: false,
1033 });
1034
1035 async fn make_iter(
1036 sstable_infos: impl Iterator<Item = &SstableInfo>,
1037 sstable_store: &SstableStoreRef,
1038 read_options: Arc<SstableIteratorReadOptions>,
1039 local_stat: &mut StoreLocalStatistic,
1040 ) -> HummockResult<MergeIterator<SstableIterator>> {
1041 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1042 let sstable_store = sstable_store.clone();
1043 let read_options = read_options.clone();
1044 async move {
1045 let mut local_stat = StoreLocalStatistic::default();
1046 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1047 Ok::<_, HummockError>((
1048 SstableIterator::new(
1049 table_holder,
1050 sstable_store,
1051 read_options,
1052 sstable_info,
1053 ),
1054 local_stat,
1055 ))
1056 }
1057 }))
1058 .await?;
1059 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1060 |(iter, stats)| {
1061 local_stat.add(&stats);
1062 iter
1063 },
1064 )))
1065 }
1066
1067 let mut local_stat = StoreLocalStatistic::default();
1068
1069 let new_value_iter = make_iter(
1070 change_log
1071 .iter()
1072 .flat_map(|log| log.new_value.iter())
1073 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1074 &self.sstable_store,
1075 read_options.clone(),
1076 &mut local_stat,
1077 )
1078 .await?;
1079 let old_value_iter = make_iter(
1080 change_log
1081 .iter()
1082 .flat_map(|log| log.old_value.iter())
1083 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1084 &self.sstable_store,
1085 read_options.clone(),
1086 &mut local_stat,
1087 )
1088 .await?;
1089 ChangeLogIterator::new(
1090 epoch_range,
1091 key_range,
1092 new_value_iter,
1093 old_value_iter,
1094 options.table_id,
1095 IterLocalMetricsGuard::new(
1096 self.state_store_metrics.clone(),
1097 options.table_id,
1098 local_stat,
1099 ),
1100 )
1101 .await
1102 }
1103}