use std::cmp::Ordering;
use std::collections::vec_deque::VecDeque;
use std::collections::HashMap;
use std::ops::Bound::Included;
use std::sync::Arc;
use std::time::Instant;
use bytes::Bytes;
use futures::future::try_join_all;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
use risingwave_hummock_sdk::key::{
bound_table_key_range, FullKey, TableKey, TableKeyRange, UserKey,
};
use risingwave_hummock_sdk::key_range::KeyRangeCommon;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::LevelType;
use sync_point::sync_point;
use tracing::warn;
use crate::error::StorageResult;
use crate::hummock::event_handler::LocalInstanceId;
use crate::hummock::iterator::change_log::ChangeLogIterator;
use crate::hummock::iterator::{
BackwardUserIterator, IteratorFactory, MergeIterator, UserIterator,
};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
use crate::hummock::sstable_store::SstableStoreRef;
use crate::hummock::utils::{
filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
search_sst_idx,
};
use crate::hummock::{
get_from_batch, get_from_sstable_info, hit_sstable_bloom_filter, BackwardIteratorFactory,
ForwardIteratorFactory, HummockError, HummockResult, HummockStorageIterator,
HummockStorageIteratorInner, HummockStorageRevIteratorInner, ReadVersionTuple, Sstable,
SstableIterator,
};
use crate::mem_table::{
ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
};
use crate::monitor::{
GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
};
use crate::store::{gen_min_epoch, ReadLogOptions, ReadOptions, StateStoreKeyedRow};
pub type CommittedVersion = PinnedVersion;
#[derive(Clone, Debug, PartialEq)]
pub struct StagingSstableInfo {
sstable_infos: Vec<LocalSstableInfo>,
old_value_sstable_infos: Vec<LocalSstableInfo>,
epochs: Vec<HummockEpoch>,
imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
imm_size: usize,
}
impl StagingSstableInfo {
pub fn new(
sstable_infos: Vec<LocalSstableInfo>,
old_value_sstable_infos: Vec<LocalSstableInfo>,
epochs: Vec<HummockEpoch>,
imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
imm_size: usize,
) -> Self {
assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
Self {
sstable_infos,
old_value_sstable_infos,
epochs,
imm_ids,
imm_size,
}
}
pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
&self.sstable_infos
}
pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
&self.old_value_sstable_infos
}
pub fn imm_size(&self) -> usize {
self.imm_size
}
pub fn epochs(&self) -> &Vec<HummockEpoch> {
&self.epochs
}
pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
&self.imm_ids
}
}
#[derive(Clone)]
pub enum StagingData {
ImmMem(ImmutableMemtable),
Sst(Arc<StagingSstableInfo>),
}
pub enum VersionUpdate {
Staging(StagingData),
CommittedSnapshot(CommittedVersion),
NewTableWatermark {
direction: WatermarkDirection,
epoch: HummockEpoch,
vnode_watermarks: Vec<VnodeWatermark>,
},
}
#[derive(Clone)]
pub struct StagingVersion {
pub imm: VecDeque<ImmutableMemtable>,
pub sst: VecDeque<Arc<StagingSstableInfo>>,
}
impl StagingVersion {
pub fn prune_overlap<'a>(
&'a self,
max_epoch_inclusive: HummockEpoch,
table_id: TableId,
table_key_range: &'a TableKeyRange,
) -> (
impl Iterator<Item = &'a ImmutableMemtable> + 'a,
impl Iterator<Item = &'a SstableInfo> + 'a,
) {
let (ref left, ref right) = table_key_range;
let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
let overlapped_imms = self.imm.iter().filter(move |imm| {
imm.min_epoch() <= max_epoch_inclusive
&& imm.table_id == table_id
&& range_overlap(
&(left, right),
&imm.start_table_key(),
Included(&imm.end_table_key()),
)
});
let overlapped_ssts = self
.sst
.iter()
.filter(move |staging_sst| {
let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
sst_max_epoch <= max_epoch_inclusive
})
.flat_map(move |staging_sst| {
staging_sst
.sstable_infos
.iter()
.map(|sstable| &sstable.sst_info)
.filter(move |sstable: &&SstableInfo| {
filter_single_sst(sstable, table_id, table_key_range)
})
});
(overlapped_imms, overlapped_ssts)
}
pub fn is_empty(&self) -> bool {
self.imm.is_empty() && self.sst.is_empty()
}
}
#[derive(Clone)]
pub struct HummockReadVersion {
table_id: TableId,
instance_id: LocalInstanceId,
staging: StagingVersion,
committed: CommittedVersion,
is_replicated: bool,
table_watermarks: Option<TableWatermarksIndex>,
vnodes: Arc<Bitmap>,
}
impl HummockReadVersion {
pub fn new_with_replication_option(
table_id: TableId,
instance_id: LocalInstanceId,
committed_version: CommittedVersion,
is_replicated: bool,
vnodes: Arc<Bitmap>,
) -> Self {
assert!(committed_version.is_valid());
Self {
table_id,
instance_id,
table_watermarks: committed_version.table_watermarks.get(&table_id).map(
|table_watermarks| {
TableWatermarksIndex::new_committed(
table_watermarks.clone(),
committed_version
.state_table_info
.info()
.get(&table_id)
.expect("should exist")
.committed_epoch,
)
},
),
staging: StagingVersion {
imm: VecDeque::default(),
sst: VecDeque::default(),
},
committed: committed_version,
is_replicated,
vnodes,
}
}
pub fn new(
table_id: TableId,
instance_id: LocalInstanceId,
committed_version: CommittedVersion,
vnodes: Arc<Bitmap>,
) -> Self {
Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
}
pub fn table_id(&self) -> TableId {
self.table_id
}
pub fn update(&mut self, info: VersionUpdate) {
match info {
VersionUpdate::Staging(staging) => match staging {
StagingData::ImmMem(imm) => {
if let Some(item) = self.staging.imm.front() {
debug_assert!(item.batch_id() < imm.batch_id());
}
self.staging.imm.push_front(imm)
}
StagingData::Sst(staging_sst_ref) => {
let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
warn!(
instance_id = self.instance_id,
"no related imm in sst input"
);
return;
};
for imm_id in imms.iter().rev() {
let check_err = match self.staging.imm.pop_back() {
None => Some("empty".to_string()),
Some(prev_imm_id) => {
if prev_imm_id.batch_id() == *imm_id {
None
} else {
Some(format!(
"miss match id {} {}",
prev_imm_id.batch_id(),
*imm_id
))
}
}
};
assert!(
check_err.is_none(),
"should be valid staging_sst.size {},
staging_sst.imm_ids {:?},
staging_sst.epochs {:?},
local_imm_ids {:?},
instance_id {}
check_err {:?}",
staging_sst_ref.imm_size,
staging_sst_ref.imm_ids,
staging_sst_ref.epochs,
self.staging
.imm
.iter()
.map(|imm| imm.batch_id())
.collect_vec(),
self.instance_id,
check_err
);
}
self.staging.sst.push_front(staging_sst_ref);
}
},
VersionUpdate::CommittedSnapshot(committed_version) => {
if let Some(info) = committed_version
.state_table_info
.info()
.get(&self.table_id)
{
let committed_epoch = info.committed_epoch;
self.staging.imm.retain(|imm| {
if self.is_replicated {
imm.min_epoch() > committed_epoch
} else {
assert!(imm.min_epoch() > committed_epoch);
true
}
});
self.staging.sst.retain(|sst| {
sst.epochs.first().expect("epochs not empty") > &committed_epoch
});
assert!(self.staging.sst.iter().all(|sst| {
sst.epochs.last().expect("epochs not empty") > &committed_epoch
}));
if let Some(committed_watermarks) =
self.committed.table_watermarks.get(&self.table_id)
{
if let Some(watermark_index) = &mut self.table_watermarks {
watermark_index.apply_committed_watermarks(
committed_watermarks.clone(),
committed_epoch,
);
} else {
self.table_watermarks = Some(TableWatermarksIndex::new_committed(
committed_watermarks.clone(),
committed_epoch,
));
}
}
}
self.committed = committed_version;
}
VersionUpdate::NewTableWatermark {
direction,
epoch,
vnode_watermarks,
} => {
if let Some(watermark_index) = &mut self.table_watermarks {
watermark_index.add_epoch_watermark(
epoch,
Arc::from(vnode_watermarks),
direction,
);
} else {
self.table_watermarks = Some(TableWatermarksIndex::new(
direction,
epoch,
vnode_watermarks,
self.committed.table_committed_epoch(self.table_id),
));
}
}
}
}
pub fn staging(&self) -> &StagingVersion {
&self.staging
}
pub fn committed(&self) -> &CommittedVersion {
&self.committed
}
pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
if let Some(watermark_index) = &self.table_watermarks {
watermark_index.filter_regress_watermarks(watermarks)
}
}
pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
self.table_watermarks
.as_ref()
.and_then(|watermark_index| watermark_index.latest_watermark(vnode))
}
pub fn is_replicated(&self) -> bool {
self.is_replicated
}
pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
std::mem::replace(&mut self.vnodes, vnodes)
}
pub fn contains(&self, vnode: VirtualNode) -> bool {
self.vnodes.is_set(vnode.to_index())
}
pub fn vnodes(&self) -> Arc<Bitmap> {
self.vnodes.clone()
}
}
pub fn read_filter_for_version(
epoch: HummockEpoch,
table_id: TableId,
mut table_key_range: TableKeyRange,
read_version: &RwLock<HummockReadVersion>,
) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
let read_version_guard = read_version.read();
let committed_version = read_version_guard.committed().clone();
if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
}
let (imm_iter, sst_iter) =
read_version_guard
.staging()
.prune_overlap(epoch, table_id, &table_key_range);
let imms = imm_iter.cloned().collect();
let ssts = sst_iter.cloned().collect();
Ok((table_key_range, (imms, ssts, committed_version)))
}
#[derive(Clone)]
pub struct HummockVersionReader {
sstable_store: SstableStoreRef,
state_store_metrics: Arc<HummockStateStoreMetrics>,
preload_retry_times: usize,
}
impl HummockVersionReader {
pub fn new(
sstable_store: SstableStoreRef,
state_store_metrics: Arc<HummockStateStoreMetrics>,
preload_retry_times: usize,
) -> Self {
Self {
sstable_store,
state_store_metrics,
preload_retry_times,
}
}
pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
&self.state_store_metrics
}
}
const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
impl HummockVersionReader {
pub async fn get(
&self,
table_key: TableKey<Bytes>,
epoch: u64,
read_options: ReadOptions,
read_version_tuple: ReadVersionTuple,
) -> StorageResult<Option<StateStoreKeyedRow>> {
let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
let mut stats_guard =
GetLocalMetricsGuard::new(self.state_store_metrics.clone(), read_options.table_id);
let local_stats = &mut stats_guard.local_stats;
local_stats.found_key = true;
for imm in &imms {
if imm.max_epoch() < min_epoch {
continue;
}
local_stats.staging_imm_get_count += 1;
if let Some((data, data_epoch)) = get_from_batch(
imm,
TableKey(table_key.as_ref()),
epoch,
&read_options,
local_stats,
) {
return Ok(if data_epoch.pure_epoch() < min_epoch {
None
} else {
data.into_user_value().map(|v| {
(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.clone(),
data_epoch,
),
v,
)
})
});
}
}
let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
Sstable::hash_for_bloom_filter(dist_key.as_ref(), read_options.table_id.table_id())
});
let full_key = FullKey::new_with_gap_epoch(
read_options.table_id,
TableKey(table_key.clone()),
EpochWithGap::new(epoch, MAX_SPILL_TIMES),
);
for local_sst in &uncommitted_ssts {
local_stats.staging_sst_get_count += 1;
if let Some((data, data_epoch)) = get_from_sstable_info(
self.sstable_store.clone(),
local_sst,
full_key.to_ref(),
&read_options,
dist_key_hash,
local_stats,
)
.await?
{
return Ok(if data_epoch.pure_epoch() < min_epoch {
None
} else {
data.into_user_value().map(|v| {
(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.clone(),
data_epoch,
),
v,
)
})
});
}
}
let single_table_key_range = table_key.clone()..=table_key.clone();
assert!(committed_version.is_valid());
for level in committed_version.levels(read_options.table_id) {
if level.table_infos.is_empty() {
continue;
}
match level.level_type {
LevelType::Overlapping | LevelType::Unspecified => {
let sstable_infos = prune_overlapping_ssts(
&level.table_infos,
read_options.table_id,
&single_table_key_range,
);
for sstable_info in sstable_infos {
local_stats.overlapping_get_count += 1;
if let Some((data, data_epoch)) = get_from_sstable_info(
self.sstable_store.clone(),
sstable_info,
full_key.to_ref(),
&read_options,
dist_key_hash,
local_stats,
)
.await?
{
return Ok(if data_epoch.pure_epoch() < min_epoch {
None
} else {
data.into_user_value().map(|v| {
(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.clone(),
data_epoch,
),
v,
)
})
});
}
}
}
LevelType::Nonoverlapping => {
let mut table_info_idx =
search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
if table_info_idx == 0 {
continue;
}
table_info_idx = table_info_idx.saturating_sub(1);
let ord = level.table_infos[table_info_idx]
.key_range
.compare_right_with_user_key(full_key.user_key.as_ref());
if ord == Ordering::Less {
sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
continue;
}
local_stats.non_overlapping_get_count += 1;
if let Some((data, data_epoch)) = get_from_sstable_info(
self.sstable_store.clone(),
&level.table_infos[table_info_idx],
full_key.to_ref(),
&read_options,
dist_key_hash,
local_stats,
)
.await?
{
return Ok(if data_epoch.pure_epoch() < min_epoch {
None
} else {
data.into_user_value().map(|v| {
(
FullKey::new_with_gap_epoch(
read_options.table_id,
table_key.clone(),
data_epoch,
),
v,
)
})
});
}
}
}
}
stats_guard.local_stats.found_key = false;
Ok(None)
}
pub async fn iter(
&self,
table_key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
) -> StorageResult<HummockStorageIterator> {
self.iter_with_memtable(
table_key_range,
epoch,
read_options,
read_version_tuple,
None,
)
.await
}
pub async fn iter_with_memtable<'a, 'b>(
&'a self,
table_key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
memtable_iter: Option<MemTableHummockIterator<'b>>,
) -> StorageResult<HummockStorageIteratorInner<'b>> {
let user_key_range_ref = bound_table_key_range(read_options.table_id, &table_key_range);
let user_key_range = (
user_key_range_ref.0.map(|key| key.cloned()),
user_key_range_ref.1.map(|key| key.cloned()),
);
let mut factory = ForwardIteratorFactory::default();
let mut local_stats = StoreLocalStatistic::default();
let (imms, uncommitted_ssts, committed) = read_version_tuple;
let table_id = read_options.table_id;
let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
self.iter_inner(
table_key_range,
epoch,
read_options,
imms,
uncommitted_ssts,
&committed,
&mut local_stats,
&mut factory,
)
.await?;
let merge_iter = factory.build(memtable_iter);
let mut user_iter = UserIterator::new(
merge_iter,
user_key_range,
epoch,
min_epoch,
Some(committed),
);
user_iter.rewind().await?;
Ok(HummockStorageIteratorInner::new(
user_iter,
self.state_store_metrics.clone(),
table_id,
local_stats,
))
}
pub async fn rev_iter<'a, 'b>(
&'a self,
table_key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
memtable_iter: Option<MemTableHummockRevIterator<'b>>,
) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
let user_key_range_ref = bound_table_key_range(read_options.table_id, &table_key_range);
let user_key_range = (
user_key_range_ref.0.map(|key| key.cloned()),
user_key_range_ref.1.map(|key| key.cloned()),
);
let mut factory = BackwardIteratorFactory::default();
let mut local_stats = StoreLocalStatistic::default();
let (imms, uncommitted_ssts, committed) = read_version_tuple;
let table_id = read_options.table_id;
let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
self.iter_inner(
table_key_range,
epoch,
read_options,
imms,
uncommitted_ssts,
&committed,
&mut local_stats,
&mut factory,
)
.await?;
let merge_iter = factory.build(memtable_iter);
let mut user_iter = BackwardUserIterator::new(
merge_iter,
user_key_range,
epoch,
min_epoch,
Some(committed),
);
user_iter.rewind().await?;
Ok(HummockStorageRevIteratorInner::new(
user_iter,
self.state_store_metrics.clone(),
table_id,
local_stats,
))
}
pub async fn iter_inner<F: IteratorFactory>(
&self,
table_key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
imms: Vec<ImmutableMemtable>,
uncommitted_ssts: Vec<SstableInfo>,
committed: &CommittedVersion,
local_stats: &mut StoreLocalStatistic,
factory: &mut F,
) -> StorageResult<()> {
local_stats.staging_imm_iter_count = imms.len() as u64;
for imm in imms {
factory.add_batch_iter(imm);
}
let user_key_range = bound_table_key_range(read_options.table_id, &table_key_range);
let user_key_range_ref = (
user_key_range.0.as_ref().map(UserKey::as_ref),
user_key_range.1.as_ref().map(UserKey::as_ref),
);
let mut staging_sst_iter_count = 0;
let bloom_filter_prefix_hash = read_options
.prefix_hint
.as_ref()
.map(|hint| Sstable::hash_for_bloom_filter(hint, read_options.table_id.table_id()));
let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
if read_options.prefetch_options.prefetch {
sst_read_options.must_iterated_end_user_key =
Some(user_key_range.1.map(|key| key.cloned()));
sst_read_options.max_preload_retry_times = self.preload_retry_times;
}
let sst_read_options = Arc::new(sst_read_options);
for sstable_info in &uncommitted_ssts {
let table_holder = self
.sstable_store
.sstable(sstable_info, local_stats)
.await?;
if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(
&table_holder,
&user_key_range_ref,
*prefix_hash,
local_stats,
) {
continue;
}
}
staging_sst_iter_count += 1;
factory.add_staging_sst_iter(F::SstableIteratorType::create(
table_holder,
self.sstable_store.clone(),
sst_read_options.clone(),
));
}
local_stats.staging_sst_iter_count = staging_sst_iter_count;
let timer = Instant::now();
for level in committed.levels(read_options.table_id) {
if level.table_infos.is_empty() {
continue;
}
if level.level_type == LevelType::Nonoverlapping {
let table_infos = prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref);
let sstables = table_infos
.filter(|sstable_info| {
sstable_info
.table_ids
.binary_search(&read_options.table_id.table_id)
.is_ok()
})
.cloned()
.collect_vec();
if sstables.is_empty() {
continue;
}
if sstables.len() > 1 {
factory.add_concat_sst_iter(
sstables,
self.sstable_store.clone(),
sst_read_options.clone(),
);
local_stats.non_overlapping_iter_count += 1;
} else {
let sstable = self
.sstable_store
.sstable(&sstables[0], local_stats)
.await?;
if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(
&sstable,
&user_key_range_ref,
*dist_hash,
local_stats,
) {
continue;
}
}
factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
sstable,
self.sstable_store.clone(),
sst_read_options.clone(),
));
local_stats.non_overlapping_iter_count += 1;
}
} else {
let table_infos = prune_overlapping_ssts(
&level.table_infos,
read_options.table_id,
&table_key_range,
);
let fetch_meta_req = table_infos.rev().collect_vec();
if fetch_meta_req.is_empty() {
continue;
}
for sstable_info in fetch_meta_req {
let sstable = self
.sstable_store
.sstable(sstable_info, local_stats)
.await?;
assert_eq!(sstable_info.object_id, sstable.id);
if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(
&sstable,
&user_key_range_ref,
*dist_hash,
local_stats,
) {
continue;
}
}
factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
sstable,
self.sstable_store.clone(),
sst_read_options.clone(),
));
local_stats.overlapping_iter_count += 1;
}
}
}
let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
let table_id_string = read_options.table_id.to_string();
tracing::warn!("Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
table_id_string, epoch, fetch_meta_duration_sec, local_stats.cache_meta_block_miss);
self.state_store_metrics
.iter_slow_fetch_meta_cache_unhits
.set(local_stats.cache_meta_block_miss as i64);
}
Ok(())
}
pub async fn iter_log(
&self,
version: PinnedVersion,
epoch_range: (u64, u64),
key_range: TableKeyRange,
options: ReadLogOptions,
) -> HummockResult<ChangeLogIterator> {
let change_log = if let Some(change_log) = version.table_change_log.get(&options.table_id) {
change_log.filter_epoch(epoch_range)
} else {
static EMPTY_VEC: Vec<EpochNewChangeLog> = Vec::new();
&EMPTY_VEC[..]
};
if let Some(max_epoch_change_log) = change_log.last() {
let (_, max_epoch) = epoch_range;
if !max_epoch_change_log.epochs.contains(&max_epoch) {
warn!(
max_epoch,
change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs.iter()).collect_vec(),
table_id = options.table_id.table_id,
"max_epoch does not exist"
);
}
}
let read_options = Arc::new(SstableIteratorReadOptions {
cache_policy: Default::default(),
must_iterated_end_user_key: None,
max_preload_retry_times: 0,
prefetch_for_large_query: false,
});
async fn make_iter(
ssts: impl Iterator<Item = &SstableInfo>,
sstable_store: &SstableStoreRef,
read_options: Arc<SstableIteratorReadOptions>,
local_stat: &mut StoreLocalStatistic,
) -> HummockResult<MergeIterator<SstableIterator>> {
let iters = try_join_all(ssts.map(|sst| {
let sstable_store = sstable_store.clone();
let read_options = read_options.clone();
async move {
let mut local_stat = StoreLocalStatistic::default();
let table_holder = sstable_store.sstable(sst, &mut local_stat).await?;
Ok::<_, HummockError>((
SstableIterator::new(table_holder, sstable_store, read_options),
local_stat,
))
}
}))
.await?;
Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
|(iter, stats)| {
local_stat.add(&stats);
iter
},
)))
}
let mut local_stat = StoreLocalStatistic::default();
let new_value_iter = make_iter(
change_log
.iter()
.flat_map(|log| log.new_value.iter())
.filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
&self.sstable_store,
read_options.clone(),
&mut local_stat,
)
.await?;
let old_value_iter = make_iter(
change_log
.iter()
.flat_map(|log| log.old_value.iter())
.filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
&self.sstable_store,
read_options.clone(),
&mut local_stat,
)
.await?;
ChangeLogIterator::new(
epoch_range,
key_range,
new_value_iter,
old_value_iter,
options.table_id,
IterLocalMetricsGuard::new(
self.state_store_metrics.clone(),
options.table_id,
local_stat,
),
)
.await
}
}