1use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::Included;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36 PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
39use risingwave_pb::hummock::LevelType;
40use sync_point::sync_point;
41use tracing::warn;
42
43use crate::error::StorageResult;
44use crate::hummock::event_handler::LocalInstanceId;
45use crate::hummock::iterator::change_log::ChangeLogIterator;
46use crate::hummock::iterator::{
47 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
48};
49use crate::hummock::local_version::pinned_version::PinnedVersion;
50use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
51use crate::hummock::sstable_store::SstableStoreRef;
52use crate::hummock::utils::{
53 filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
54 search_sst_idx,
55};
56use crate::hummock::{
57 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
58 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
59 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
60 hit_sstable_bloom_filter,
61};
62use crate::mem_table::{
63 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
64};
65use crate::monitor::{
66 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
67};
68use crate::store::{ReadLogOptions, ReadOptions, gen_min_epoch};
69
70pub type CommittedVersion = PinnedVersion;
71
72#[derive(Clone, Debug, PartialEq)]
78pub struct StagingSstableInfo {
79 sstable_infos: Vec<LocalSstableInfo>,
81 old_value_sstable_infos: Vec<LocalSstableInfo>,
82 epochs: Vec<HummockEpoch>,
85 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
87 imm_size: usize,
88}
89
90impl StagingSstableInfo {
91 pub fn new(
92 sstable_infos: Vec<LocalSstableInfo>,
93 old_value_sstable_infos: Vec<LocalSstableInfo>,
94 epochs: Vec<HummockEpoch>,
95 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
96 imm_size: usize,
97 ) -> Self {
98 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
100 Self {
101 sstable_infos,
102 old_value_sstable_infos,
103 epochs,
104 imm_ids,
105 imm_size,
106 }
107 }
108
109 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
110 &self.sstable_infos
111 }
112
113 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
114 &self.old_value_sstable_infos
115 }
116
117 pub fn imm_size(&self) -> usize {
118 self.imm_size
119 }
120
121 pub fn epochs(&self) -> &Vec<HummockEpoch> {
122 &self.epochs
123 }
124
125 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
126 &self.imm_ids
127 }
128}
129
130#[derive(Clone)]
131pub enum StagingData {
132 ImmMem(ImmutableMemtable),
133 Sst(Arc<StagingSstableInfo>),
134}
135
136pub enum VersionUpdate {
137 Staging(StagingData),
139 CommittedSnapshot(CommittedVersion),
140 NewTableWatermark {
141 direction: WatermarkDirection,
142 epoch: HummockEpoch,
143 vnode_watermarks: Vec<VnodeWatermark>,
144 watermark_type: WatermarkSerdeType,
145 },
146}
147
148#[derive(Clone)]
149pub struct StagingVersion {
150 pub imm: VecDeque<ImmutableMemtable>,
154
155 pub sst: VecDeque<Arc<StagingSstableInfo>>,
157}
158
159impl StagingVersion {
160 pub fn prune_overlap<'a>(
163 &'a self,
164 max_epoch_inclusive: HummockEpoch,
165 table_id: TableId,
166 table_key_range: &'a TableKeyRange,
167 ) -> (
168 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
169 impl Iterator<Item = &'a SstableInfo> + 'a,
170 ) {
171 let (left, right) = table_key_range;
172 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
173 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
174 let overlapped_imms = self.imm.iter().filter(move |imm| {
175 imm.min_epoch() <= max_epoch_inclusive
177 && imm.table_id == table_id
178 && range_overlap(
179 &(left, right),
180 &imm.start_table_key(),
181 Included(&imm.end_table_key()),
182 )
183 });
184
185 let overlapped_ssts = self
187 .sst
188 .iter()
189 .filter(move |staging_sst| {
190 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
191 sst_max_epoch <= max_epoch_inclusive
192 })
193 .flat_map(move |staging_sst| {
194 staging_sst
197 .sstable_infos
198 .iter()
199 .map(|sstable| &sstable.sst_info)
200 .filter(move |sstable: &&SstableInfo| {
201 filter_single_sst(sstable, table_id, table_key_range)
202 })
203 });
204 (overlapped_imms, overlapped_ssts)
205 }
206
207 pub fn is_empty(&self) -> bool {
208 self.imm.is_empty() && self.sst.is_empty()
209 }
210}
211
212#[derive(Clone)]
213pub struct HummockReadVersion {
215 table_id: TableId,
216 instance_id: LocalInstanceId,
217
218 staging: StagingVersion,
220
221 committed: CommittedVersion,
223
224 is_replicated: bool,
229
230 table_watermarks: Option<PkPrefixTableWatermarksIndex>,
231
232 vnodes: Arc<Bitmap>,
235}
236
237impl HummockReadVersion {
238 pub fn new_with_replication_option(
239 table_id: TableId,
240 instance_id: LocalInstanceId,
241 committed_version: CommittedVersion,
242 is_replicated: bool,
243 vnodes: Arc<Bitmap>,
244 ) -> Self {
245 assert!(committed_version.is_valid());
249 Self {
250 table_id,
251 instance_id,
252 table_watermarks: {
253 match committed_version.table_watermarks.get(&table_id) {
254 Some(table_watermarks) => match table_watermarks.watermark_type {
255 WatermarkSerdeType::PkPrefix => {
256 Some(PkPrefixTableWatermarksIndex::new_committed(
257 table_watermarks.clone(),
258 committed_version
259 .state_table_info
260 .info()
261 .get(&table_id)
262 .expect("should exist")
263 .committed_epoch,
264 ))
265 }
266
267 WatermarkSerdeType::NonPkPrefix => None, },
269 None => None,
270 }
271 },
272 staging: StagingVersion {
273 imm: VecDeque::default(),
274 sst: VecDeque::default(),
275 },
276
277 committed: committed_version,
278
279 is_replicated,
280 vnodes,
281 }
282 }
283
284 pub fn new(
285 table_id: TableId,
286 instance_id: LocalInstanceId,
287 committed_version: CommittedVersion,
288 vnodes: Arc<Bitmap>,
289 ) -> Self {
290 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
291 }
292
293 pub fn table_id(&self) -> TableId {
294 self.table_id
295 }
296
297 pub fn update(&mut self, info: VersionUpdate) {
307 match info {
308 VersionUpdate::Staging(staging) => match staging {
309 StagingData::ImmMem(imm) => {
312 if let Some(item) = self.staging.imm.front() {
313 debug_assert!(item.batch_id() < imm.batch_id());
315 }
316
317 self.staging.imm.push_front(imm)
318 }
319 StagingData::Sst(staging_sst_ref) => {
320 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
321 warn!(
322 instance_id = self.instance_id,
323 "no related imm in sst input"
324 );
325 return;
326 };
327
328 for imm_id in imms.iter().rev() {
330 let check_err = match self.staging.imm.pop_back() {
331 None => Some("empty".to_owned()),
332 Some(prev_imm_id) => {
333 if prev_imm_id.batch_id() == *imm_id {
334 None
335 } else {
336 Some(format!(
337 "miss match id {} {}",
338 prev_imm_id.batch_id(),
339 *imm_id
340 ))
341 }
342 }
343 };
344 assert!(
345 check_err.is_none(),
346 "should be valid staging_sst.size {},
347 staging_sst.imm_ids {:?},
348 staging_sst.epochs {:?},
349 local_imm_ids {:?},
350 instance_id {}
351 check_err {:?}",
352 staging_sst_ref.imm_size,
353 staging_sst_ref.imm_ids,
354 staging_sst_ref.epochs,
355 self.staging
356 .imm
357 .iter()
358 .map(|imm| imm.batch_id())
359 .collect_vec(),
360 self.instance_id,
361 check_err
362 );
363 }
364
365 self.staging.sst.push_front(staging_sst_ref);
366 }
367 },
368
369 VersionUpdate::CommittedSnapshot(committed_version) => {
370 if let Some(info) = committed_version
371 .state_table_info
372 .info()
373 .get(&self.table_id)
374 {
375 let committed_epoch = info.committed_epoch;
376 self.staging.imm.retain(|imm| {
377 if self.is_replicated {
378 imm.min_epoch() > committed_epoch
379 } else {
380 assert!(imm.min_epoch() > committed_epoch);
381 true
382 }
383 });
384
385 self.staging.sst.retain(|sst| {
386 sst.epochs.first().expect("epochs not empty") > &committed_epoch
387 });
388
389 assert!(self.staging.sst.iter().all(|sst| {
391 sst.epochs.last().expect("epochs not empty") > &committed_epoch
392 }));
393
394 if let Some(committed_watermarks) =
395 committed_version.table_watermarks.get(&self.table_id)
396 && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
397 {
398 if let Some(watermark_index) = &mut self.table_watermarks {
399 watermark_index.apply_committed_watermarks(
400 committed_watermarks.clone(),
401 committed_epoch,
402 );
403 } else {
404 self.table_watermarks =
405 Some(PkPrefixTableWatermarksIndex::new_committed(
406 committed_watermarks.clone(),
407 committed_epoch,
408 ));
409 }
410 }
411 }
412
413 self.committed = committed_version;
414 }
415 VersionUpdate::NewTableWatermark {
416 direction,
417 epoch,
418 vnode_watermarks,
419 watermark_type,
420 } => {
421 assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
422 if let Some(watermark_index) = &mut self.table_watermarks {
423 watermark_index.add_epoch_watermark(
424 epoch,
425 Arc::from(vnode_watermarks),
426 direction,
427 );
428 } else {
429 self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
430 direction,
431 epoch,
432 vnode_watermarks,
433 self.committed.table_committed_epoch(self.table_id),
434 ));
435 }
436 }
437 }
438 }
439
440 pub fn staging(&self) -> &StagingVersion {
441 &self.staging
442 }
443
444 pub fn committed(&self) -> &CommittedVersion {
445 &self.committed
446 }
447
448 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
453 if let Some(watermark_index) = &self.table_watermarks {
454 watermark_index.filter_regress_watermarks(watermarks)
455 }
456 }
457
458 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
459 self.table_watermarks
460 .as_ref()
461 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
462 }
463
464 pub fn is_replicated(&self) -> bool {
465 self.is_replicated
466 }
467
468 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
469 std::mem::replace(&mut self.vnodes, vnodes)
470 }
471
472 pub fn contains(&self, vnode: VirtualNode) -> bool {
473 self.vnodes.is_set(vnode.to_index())
474 }
475
476 pub fn vnodes(&self) -> Arc<Bitmap> {
477 self.vnodes.clone()
478 }
479}
480
481pub fn read_filter_for_version(
482 epoch: HummockEpoch,
483 table_id: TableId,
484 mut table_key_range: TableKeyRange,
485 read_version: &RwLock<HummockReadVersion>,
486) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
487 let read_version_guard = read_version.read();
488
489 let committed_version = read_version_guard.committed().clone();
490
491 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
492 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
493 }
494
495 let (imm_iter, sst_iter) =
496 read_version_guard
497 .staging()
498 .prune_overlap(epoch, table_id, &table_key_range);
499
500 let imms = imm_iter.cloned().collect();
501 let ssts = sst_iter.cloned().collect();
502
503 Ok((table_key_range, (imms, ssts, committed_version)))
504}
505
506#[derive(Clone)]
507pub struct HummockVersionReader {
508 sstable_store: SstableStoreRef,
509
510 state_store_metrics: Arc<HummockStateStoreMetrics>,
512 preload_retry_times: usize,
513}
514
515impl HummockVersionReader {
518 pub fn new(
519 sstable_store: SstableStoreRef,
520 state_store_metrics: Arc<HummockStateStoreMetrics>,
521 preload_retry_times: usize,
522 ) -> Self {
523 Self {
524 sstable_store,
525 state_store_metrics,
526 preload_retry_times,
527 }
528 }
529
530 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
531 &self.state_store_metrics
532 }
533}
534
535const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
536
537impl HummockVersionReader {
538 pub async fn get<O>(
539 &self,
540 table_key: TableKey<Bytes>,
541 epoch: u64,
542 table_id: TableId,
543 read_options: ReadOptions,
544 read_version_tuple: ReadVersionTuple,
545 on_key_value_fn: impl crate::store::KeyValueFn<O>,
546 ) -> StorageResult<Option<O>> {
547 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
548
549 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
550 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
551 let local_stats = &mut stats_guard.local_stats;
552 local_stats.found_key = true;
553
554 for imm in &imms {
556 if imm.max_epoch() < min_epoch {
558 continue;
559 }
560
561 local_stats.staging_imm_get_count += 1;
562
563 if let Some((data, data_epoch)) = get_from_batch(
564 imm,
565 TableKey(table_key.as_ref()),
566 epoch,
567 &read_options,
568 local_stats,
569 ) {
570 return Ok(if data_epoch.pure_epoch() < min_epoch {
571 None
572 } else {
573 data.into_user_value()
574 .map(|v| {
575 on_key_value_fn(
576 FullKey::new_with_gap_epoch(
577 table_id,
578 table_key.to_ref(),
579 data_epoch,
580 ),
581 v.as_ref(),
582 )
583 })
584 .transpose()?
585 });
586 }
587 }
588
589 let dist_key_hash = read_options
591 .prefix_hint
592 .as_ref()
593 .map(|dist_key| Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.table_id()));
594
595 let full_key = FullKey::new_with_gap_epoch(
598 table_id,
599 TableKey(table_key.clone()),
600 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
601 );
602 for local_sst in &uncommitted_ssts {
603 local_stats.staging_sst_get_count += 1;
604 if let Some(iter) = get_from_sstable_info(
605 self.sstable_store.clone(),
606 local_sst,
607 full_key.to_ref(),
608 &read_options,
609 dist_key_hash,
610 local_stats,
611 )
612 .await?
613 {
614 debug_assert!(iter.is_valid());
615 let data_epoch = iter.key().epoch_with_gap;
616 return Ok(if data_epoch.pure_epoch() < min_epoch {
617 None
618 } else {
619 iter.value()
620 .into_user_value()
621 .map(|v| {
622 on_key_value_fn(
623 FullKey::new_with_gap_epoch(
624 table_id,
625 table_key.to_ref(),
626 data_epoch,
627 ),
628 v,
629 )
630 })
631 .transpose()?
632 });
633 }
634 }
635 let single_table_key_range = table_key.clone()..=table_key.clone();
636 assert!(committed_version.is_valid());
640 for level in committed_version.levels(table_id) {
641 if level.table_infos.is_empty() {
642 continue;
643 }
644
645 match level.level_type {
646 LevelType::Overlapping | LevelType::Unspecified => {
647 let sstable_infos = prune_overlapping_ssts(
648 &level.table_infos,
649 table_id,
650 &single_table_key_range,
651 );
652 for sstable_info in sstable_infos {
653 local_stats.overlapping_get_count += 1;
654 if let Some(iter) = get_from_sstable_info(
655 self.sstable_store.clone(),
656 sstable_info,
657 full_key.to_ref(),
658 &read_options,
659 dist_key_hash,
660 local_stats,
661 )
662 .await?
663 {
664 debug_assert!(iter.is_valid());
665 let data_epoch = iter.key().epoch_with_gap;
666 return Ok(if data_epoch.pure_epoch() < min_epoch {
667 None
668 } else {
669 iter.value()
670 .into_user_value()
671 .map(|v| {
672 on_key_value_fn(
673 FullKey::new_with_gap_epoch(
674 table_id,
675 table_key.to_ref(),
676 data_epoch,
677 ),
678 v,
679 )
680 })
681 .transpose()?
682 });
683 }
684 }
685 }
686 LevelType::Nonoverlapping => {
687 let mut table_info_idx =
688 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
689 if table_info_idx == 0 {
690 continue;
691 }
692 table_info_idx = table_info_idx.saturating_sub(1);
693 let ord = level.table_infos[table_info_idx]
694 .key_range
695 .compare_right_with_user_key(full_key.user_key.as_ref());
696 if ord == Ordering::Less {
698 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
699 continue;
700 }
701
702 local_stats.non_overlapping_get_count += 1;
703 if let Some(iter) = get_from_sstable_info(
704 self.sstable_store.clone(),
705 &level.table_infos[table_info_idx],
706 full_key.to_ref(),
707 &read_options,
708 dist_key_hash,
709 local_stats,
710 )
711 .await?
712 {
713 debug_assert!(iter.is_valid());
714 let data_epoch = iter.key().epoch_with_gap;
715 return Ok(if data_epoch.pure_epoch() < min_epoch {
716 None
717 } else {
718 iter.value()
719 .into_user_value()
720 .map(|v| {
721 on_key_value_fn(
722 FullKey::new_with_gap_epoch(
723 table_id,
724 table_key.to_ref(),
725 data_epoch,
726 ),
727 v,
728 )
729 })
730 .transpose()?
731 });
732 }
733 }
734 }
735 }
736 stats_guard.local_stats.found_key = false;
737 Ok(None)
738 }
739
740 pub async fn iter(
741 &self,
742 table_key_range: TableKeyRange,
743 epoch: u64,
744 table_id: TableId,
745 read_options: ReadOptions,
746 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
747 ) -> StorageResult<HummockStorageIterator> {
748 self.iter_with_memtable(
749 table_key_range,
750 epoch,
751 table_id,
752 read_options,
753 read_version_tuple,
754 None,
755 )
756 .await
757 }
758
759 pub async fn iter_with_memtable<'b>(
760 &self,
761 table_key_range: TableKeyRange,
762 epoch: u64,
763 table_id: TableId,
764 read_options: ReadOptions,
765 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
766 memtable_iter: Option<MemTableHummockIterator<'b>>,
767 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
768 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
769 let user_key_range = (
770 user_key_range_ref.0.map(|key| key.cloned()),
771 user_key_range_ref.1.map(|key| key.cloned()),
772 );
773 let mut factory = ForwardIteratorFactory::default();
774 let mut local_stats = StoreLocalStatistic::default();
775 let (imms, uncommitted_ssts, committed) = read_version_tuple;
776 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
777 self.iter_inner(
778 table_key_range,
779 epoch,
780 table_id,
781 read_options,
782 imms,
783 uncommitted_ssts,
784 &committed,
785 &mut local_stats,
786 &mut factory,
787 )
788 .await?;
789 let merge_iter = factory.build(memtable_iter);
790 let mut user_iter = UserIterator::new(
792 merge_iter,
793 user_key_range,
794 epoch,
795 min_epoch,
796 Some(committed),
797 );
798 user_iter.rewind().await?;
799 Ok(HummockStorageIteratorInner::new(
800 user_iter,
801 self.state_store_metrics.clone(),
802 table_id,
803 local_stats,
804 ))
805 }
806
807 pub async fn rev_iter<'b>(
808 &self,
809 table_key_range: TableKeyRange,
810 epoch: u64,
811 table_id: TableId,
812 read_options: ReadOptions,
813 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
814 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
815 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
816 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
817 let user_key_range = (
818 user_key_range_ref.0.map(|key| key.cloned()),
819 user_key_range_ref.1.map(|key| key.cloned()),
820 );
821 let mut factory = BackwardIteratorFactory::default();
822 let mut local_stats = StoreLocalStatistic::default();
823 let (imms, uncommitted_ssts, committed) = read_version_tuple;
824 let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
825 self.iter_inner(
826 table_key_range,
827 epoch,
828 table_id,
829 read_options,
830 imms,
831 uncommitted_ssts,
832 &committed,
833 &mut local_stats,
834 &mut factory,
835 )
836 .await?;
837 let merge_iter = factory.build(memtable_iter);
838 let mut user_iter = BackwardUserIterator::new(
840 merge_iter,
841 user_key_range,
842 epoch,
843 min_epoch,
844 Some(committed),
845 );
846 user_iter.rewind().await?;
847 Ok(HummockStorageRevIteratorInner::new(
848 user_iter,
849 self.state_store_metrics.clone(),
850 table_id,
851 local_stats,
852 ))
853 }
854
855 async fn iter_inner<F: IteratorFactory>(
856 &self,
857 table_key_range: TableKeyRange,
858 epoch: u64,
859 table_id: TableId,
860 read_options: ReadOptions,
861 imms: Vec<ImmutableMemtable>,
862 uncommitted_ssts: Vec<SstableInfo>,
863 committed: &CommittedVersion,
864 local_stats: &mut StoreLocalStatistic,
865 factory: &mut F,
866 ) -> StorageResult<()> {
867 local_stats.staging_imm_iter_count = imms.len() as u64;
868 for imm in imms {
869 factory.add_batch_iter(imm);
870 }
871
872 let user_key_range = bound_table_key_range(table_id, &table_key_range);
876 let user_key_range_ref = (
877 user_key_range.0.as_ref().map(UserKey::as_ref),
878 user_key_range.1.as_ref().map(UserKey::as_ref),
879 );
880 let mut staging_sst_iter_count = 0;
881 let bloom_filter_prefix_hash = read_options
883 .prefix_hint
884 .as_ref()
885 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.table_id()));
886 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
887 if read_options.prefetch_options.prefetch {
888 sst_read_options.must_iterated_end_user_key =
889 Some(user_key_range.1.map(|key| key.cloned()));
890 sst_read_options.max_preload_retry_times = self.preload_retry_times;
891 }
892 let sst_read_options = Arc::new(sst_read_options);
893 for sstable_info in &uncommitted_ssts {
894 let table_holder = self
895 .sstable_store
896 .sstable(sstable_info, local_stats)
897 .await?;
898
899 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
900 if !hit_sstable_bloom_filter(
901 &table_holder,
902 &user_key_range_ref,
903 *prefix_hash,
904 local_stats,
905 ) {
906 continue;
907 }
908 }
909
910 staging_sst_iter_count += 1;
911 factory.add_staging_sst_iter(F::SstableIteratorType::create(
912 table_holder,
913 self.sstable_store.clone(),
914 sst_read_options.clone(),
915 sstable_info,
916 ));
917 }
918 local_stats.staging_sst_iter_count = staging_sst_iter_count;
919
920 let timer = Instant::now();
921
922 for level in committed.levels(table_id) {
923 if level.table_infos.is_empty() {
924 continue;
925 }
926
927 if level.level_type == LevelType::Nonoverlapping {
928 let mut table_infos = prune_nonoverlapping_ssts(
929 &level.table_infos,
930 user_key_range_ref,
931 table_id.table_id(),
932 )
933 .peekable();
934
935 if table_infos.peek().is_none() {
936 continue;
937 }
938 let sstable_infos = table_infos.cloned().collect_vec();
939 if sstable_infos.len() > 1 {
940 factory.add_concat_sst_iter(
941 sstable_infos,
942 self.sstable_store.clone(),
943 sst_read_options.clone(),
944 );
945 local_stats.non_overlapping_iter_count += 1;
946 } else {
947 let sstable = self
948 .sstable_store
949 .sstable(&sstable_infos[0], local_stats)
950 .await?;
951
952 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
953 if !hit_sstable_bloom_filter(
954 &sstable,
955 &user_key_range_ref,
956 *dist_hash,
957 local_stats,
958 ) {
959 continue;
960 }
961 }
962 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
968 sstable,
969 self.sstable_store.clone(),
970 sst_read_options.clone(),
971 &sstable_infos[0],
972 ));
973 local_stats.non_overlapping_iter_count += 1;
974 }
975 } else {
976 let table_infos =
977 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
978 let fetch_meta_req = table_infos.rev().collect_vec();
980 if fetch_meta_req.is_empty() {
981 continue;
982 }
983 for sstable_info in fetch_meta_req {
984 let sstable = self
985 .sstable_store
986 .sstable(sstable_info, local_stats)
987 .await?;
988 assert_eq!(sstable_info.object_id, sstable.id);
989 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
990 if !hit_sstable_bloom_filter(
991 &sstable,
992 &user_key_range_ref,
993 *dist_hash,
994 local_stats,
995 ) {
996 continue;
997 }
998 }
999 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1000 sstable,
1001 self.sstable_store.clone(),
1002 sst_read_options.clone(),
1003 sstable_info,
1004 ));
1005 local_stats.overlapping_iter_count += 1;
1006 }
1007 }
1008 }
1009 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1010 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1011 let table_id_string = table_id.to_string();
1012 tracing::warn!(
1013 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1014 table_id_string,
1015 epoch,
1016 fetch_meta_duration_sec,
1017 local_stats.cache_meta_block_miss
1018 );
1019 self.state_store_metrics
1020 .iter_slow_fetch_meta_cache_unhits
1021 .set(local_stats.cache_meta_block_miss as i64);
1022 }
1023 Ok(())
1024 }
1025
1026 pub async fn iter_log(
1027 &self,
1028 version: PinnedVersion,
1029 epoch_range: (u64, u64),
1030 key_range: TableKeyRange,
1031 options: ReadLogOptions,
1032 ) -> HummockResult<ChangeLogIterator> {
1033 let change_log = {
1034 let table_change_logs = version.table_change_log_read_lock();
1035 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1036 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1037 } else {
1038 Vec::new()
1039 }
1040 };
1041
1042 if let Some(max_epoch_change_log) = change_log.last() {
1043 let (_, max_epoch) = epoch_range;
1044 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1045 warn!(
1046 max_epoch,
1047 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1048 table_id = options.table_id.table_id,
1049 "max_epoch does not exist"
1050 );
1051 }
1052 }
1053 let read_options = Arc::new(SstableIteratorReadOptions {
1054 cache_policy: Default::default(),
1055 must_iterated_end_user_key: None,
1056 max_preload_retry_times: 0,
1057 prefetch_for_large_query: false,
1058 });
1059
1060 async fn make_iter(
1061 sstable_infos: impl Iterator<Item = &SstableInfo>,
1062 sstable_store: &SstableStoreRef,
1063 read_options: Arc<SstableIteratorReadOptions>,
1064 local_stat: &mut StoreLocalStatistic,
1065 ) -> HummockResult<MergeIterator<SstableIterator>> {
1066 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1067 let sstable_store = sstable_store.clone();
1068 let read_options = read_options.clone();
1069 async move {
1070 let mut local_stat = StoreLocalStatistic::default();
1071 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1072 Ok::<_, HummockError>((
1073 SstableIterator::new(
1074 table_holder,
1075 sstable_store,
1076 read_options,
1077 sstable_info,
1078 ),
1079 local_stat,
1080 ))
1081 }
1082 }))
1083 .await?;
1084 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1085 |(iter, stats)| {
1086 local_stat.add(&stats);
1087 iter
1088 },
1089 )))
1090 }
1091
1092 let mut local_stat = StoreLocalStatistic::default();
1093
1094 let new_value_iter = make_iter(
1095 change_log
1096 .iter()
1097 .flat_map(|log| log.new_value.iter())
1098 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1099 &self.sstable_store,
1100 read_options.clone(),
1101 &mut local_stat,
1102 )
1103 .await?;
1104 let old_value_iter = make_iter(
1105 change_log
1106 .iter()
1107 .flat_map(|log| log.old_value.iter())
1108 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1109 &self.sstable_store,
1110 read_options.clone(),
1111 &mut local_stat,
1112 )
1113 .await?;
1114 ChangeLogIterator::new(
1115 epoch_range,
1116 key_range,
1117 new_value_iter,
1118 old_value_iter,
1119 options.table_id,
1120 IterLocalMetricsGuard::new(
1121 self.state_store_metrics.clone(),
1122 options.table_id,
1123 local_stat,
1124 ),
1125 )
1126 .await
1127 }
1128}