risingwave_storage/hummock/
validator.rs1use std::borrow::BorrowMut;
16use std::cmp;
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use risingwave_hummock_sdk::compact_task::ValidationTask;
21use risingwave_hummock_sdk::key::FullKey;
22
23use crate::hummock::iterator::HummockIterator;
24use crate::hummock::sstable::SstableIteratorReadOptions;
25use crate::hummock::sstable_store::SstableStoreRef;
26use crate::hummock::{CachePolicy, SstableIterator};
27use crate::monitor::StoreLocalStatistic;
28
29pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) {
33 let mut visited_keys = HashMap::new();
34 let mut unused = StoreLocalStatistic::default();
35 for sstable_info in task.sst_infos {
36 let mut key_counts = 0;
37 let worker_id = *task
38 .sst_id_to_worker_id
39 .get(&sstable_info.object_id)
40 .expect("valid worker_id");
41 tracing::debug!(
42 "Validating SST sst_id {} object_id {} from worker {}",
43 sstable_info.sst_id,
44 sstable_info.object_id,
45 worker_id,
46 );
47 let holder = match sstable_store
48 .sstable(&sstable_info, unused.borrow_mut())
49 .await
50 {
51 Ok(holder) => holder,
52 Err(_err) => {
53 tracing::info!(
55 "Skip sanity check for SST sst_id {} object_id {} .",
56 sstable_info.sst_id,
57 sstable_info.object_id,
58 );
59 continue;
60 }
61 };
62
63 let mut iter = SstableIterator::new(
66 holder,
67 sstable_store.clone(),
68 Arc::new(SstableIteratorReadOptions {
69 cache_policy: CachePolicy::NotFill,
70 must_iterated_end_user_key: None,
71 max_preload_retry_times: 0,
72 prefetch_for_large_query: false,
73 }),
74 &sstable_info,
75 );
76 let mut previous_key: Option<FullKey<Vec<u8>>> = None;
77 if let Err(_err) = iter.rewind().await {
78 tracing::info!(
79 "Skip sanity check for SST sst_id {} object_id {}.",
80 sstable_info.sst_id,
81 sstable_info.object_id
82 );
83 }
84 while iter.is_valid() {
85 key_counts += 1;
86 let current_key = iter.key().to_vec();
87 if let Some((duplicate_sst_object_id, duplicate_worker_id)) =
89 visited_keys.get(¤t_key).cloned()
90 {
91 panic!(
92 "SST sanity check failed: Duplicate key {:x?} in SST object {} from worker {} and SST object {} from worker {}",
93 current_key,
94 sstable_info.object_id,
95 worker_id,
96 duplicate_sst_object_id,
97 duplicate_worker_id
98 )
99 }
100 visited_keys.insert(current_key.to_owned(), (sstable_info.object_id, worker_id));
101 if let Some(previous_key) = previous_key.take() {
103 let cmp = previous_key.cmp(¤t_key);
104 if cmp != cmp::Ordering::Less {
105 panic!(
106 "SST sanity check failed: For SST sst_id {} object_id {}, expect {:x?} < {:x?}, got {:#?}",
107 sstable_info.sst_id, sstable_info.object_id, previous_key, current_key, cmp
108 )
109 }
110 }
111 previous_key = Some(current_key);
112 if let Err(_err) = iter.next().await {
113 tracing::info!(
114 "Skip remaining sanity check for SST {}",
115 sstable_info.object_id
116 );
117 break;
118 }
119 }
120 tracing::debug!(
121 "Validated {} keys for SST sst_id {} object_id {}",
122 key_counts,
123 sstable_info.sst_id,
124 sstable_info.object_id
125 );
126 iter.collect_local_statistic(&mut unused);
127 unused.ignore();
128 }
129}