risingwave_storage/hummock/
validator.rsuse std::borrow::BorrowMut;
use std::cmp;
use std::collections::HashMap;
use std::sync::Arc;
use risingwave_hummock_sdk::compact_task::ValidationTask;
use risingwave_hummock_sdk::key::FullKey;
use crate::hummock::iterator::HummockIterator;
use crate::hummock::sstable::SstableIteratorReadOptions;
use crate::hummock::sstable_store::SstableStoreRef;
use crate::hummock::{CachePolicy, SstableIterator};
use crate::monitor::StoreLocalStatistic;
pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) {
let mut visited_keys = HashMap::new();
let mut unused = StoreLocalStatistic::default();
for sst in task.sst_infos {
let mut key_counts = 0;
let worker_id = *task
.sst_id_to_worker_id
.get(&sst.object_id)
.expect("valid worker_id");
tracing::debug!("Validating SST {} from worker {}", sst.object_id, worker_id,);
let holder = match sstable_store.sstable(&sst, unused.borrow_mut()).await {
Ok(holder) => holder,
Err(_err) => {
tracing::info!("Skip sanity check for SST {}.", sst.object_id);
continue;
}
};
let mut iter = SstableIterator::new(
holder,
sstable_store.clone(),
Arc::new(SstableIteratorReadOptions {
cache_policy: CachePolicy::NotFill,
must_iterated_end_user_key: None,
max_preload_retry_times: 0,
prefetch_for_large_query: false,
}),
);
let mut previous_key: Option<FullKey<Vec<u8>>> = None;
if let Err(_err) = iter.rewind().await {
tracing::info!("Skip sanity check for SST {}.", sst.object_id);
}
while iter.is_valid() {
key_counts += 1;
let current_key = iter.key().to_vec();
if let Some((duplicate_sst_object_id, duplicate_worker_id)) =
visited_keys.get(¤t_key).cloned()
{
panic!("SST sanity check failed: Duplicate key {:x?} in SST object {} from worker {} and SST object {} from worker {}",
current_key,
sst.object_id,
worker_id,
duplicate_sst_object_id,
duplicate_worker_id)
}
visited_keys.insert(current_key.to_owned(), (sst.object_id, worker_id));
if let Some(previous_key) = previous_key.take() {
let cmp = previous_key.cmp(¤t_key);
if cmp != cmp::Ordering::Less {
panic!(
"SST sanity check failed: For SST {}, expect {:x?} < {:x?}, got {:#?}",
sst.object_id, previous_key, current_key, cmp
)
}
}
previous_key = Some(current_key);
if let Err(_err) = iter.next().await {
tracing::info!("Skip remaining sanity check for SST {}", sst.object_id,);
break;
}
}
tracing::debug!("Validated {} keys for SST {}", key_counts, sst.object_id,);
iter.collect_local_statistic(&mut unused);
unused.ignore();
}
}