risingwave_ctl/cmd_impl/hummock/
validate_version.rs1use std::cmp::Ordering;
16
17use chrono::DateTime;
18use chrono::offset::Utc;
19use itertools::Itertools;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
22use risingwave_hummock_sdk::key::{FullKey, UserKey};
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
25use risingwave_hummock_sdk::{HummockVersionId, version_archive_dir};
26use risingwave_object_store::object::ObjectStoreRef;
27use risingwave_pb::hummock::HummockVersionArchive;
28use risingwave_pb::hummock::group_delta::DeltaType;
29use risingwave_pb::id::HummockSstableId;
30use risingwave_rpc_client::HummockMetaClient;
31use risingwave_storage::hummock::value::HummockValue;
32use risingwave_storage::hummock::{Block, BlockHolder, BlockIterator, SstableStoreRef};
33use risingwave_storage::monitor::StoreLocalStatistic;
34
35use crate::CtlContext;
36use crate::common::HummockServiceOpts;
37
38pub async fn validate_version(context: &CtlContext) -> anyhow::Result<()> {
39 let meta_client = context.meta_client().await?;
40 let version = meta_client.get_current_version().await?;
41 let result = hummock_version_ext::validate_version(&version);
42 if !result.is_empty() {
43 println!("Invalid HummockVersion. Violation lists:");
44 for s in result {
45 println!("{}", s);
46 }
47 }
48
49 Ok(())
50}
51
52async fn get_archive(
53 archive_id: HummockVersionId,
54 data_dir: &str,
55 archive_object_store: ObjectStoreRef,
56) -> anyhow::Result<HummockVersionArchive> {
57 use prost::Message;
58 let archive_dir = version_archive_dir(data_dir);
59 let archive_path = format!("{archive_dir}/{archive_id}");
60 let archive_bytes = archive_object_store.read(&archive_path, ..).await?;
61 let archive: HummockVersionArchive = HummockVersionArchive::decode(archive_bytes)?;
62 Ok(archive)
63}
64
65pub async fn print_user_key_in_archive(
66 context: &CtlContext,
67 archive_ids: impl IntoIterator<Item = HummockVersionId>,
68 data_dir: String,
69 user_key: String,
70 use_new_object_prefix_strategy: bool,
71) -> anyhow::Result<()> {
72 let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| {
73 panic!("cannot decode user key {} into raw bytes", user_key);
74 });
75 let user_key = UserKey::decode(&user_key_bytes);
76 println!("user key: {user_key:?}");
77
78 let hummock_opts =
79 HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
80 let hummock = context.hummock_store(hummock_opts).await?;
81 let sstable_store = hummock.sstable_store();
82 let archive_object_store = sstable_store.store();
83 for archive_id in archive_ids.into_iter().sorted() {
84 println!("search archive {archive_id}");
85 let archive = get_archive(archive_id, &data_dir, archive_object_store.clone()).await?;
86 let mut base_version =
87 HummockVersion::from_persisted_protobuf(archive.version.as_ref().unwrap());
88 print_user_key_in_version(sstable_store.clone(), &base_version, &user_key).await?;
89 for delta in &archive.version_deltas {
90 base_version.apply_version_delta(&HummockVersionDelta::from_persisted_protobuf(delta));
91 print_user_key_in_version(sstable_store.clone(), &base_version, &user_key).await?;
92 }
93 }
94 Ok(())
95}
96
97async fn print_user_key_in_version(
98 sstable_store: SstableStoreRef,
99 version: &HummockVersion,
100 target_key: &UserKey<&[u8]>,
101) -> anyhow::Result<()> {
102 println!("print key {:?} in version {}", target_key, version.id);
103 for cg in version.levels.values() {
104 for level in cg.l0.sub_levels.iter().rev().chain(cg.levels.iter()) {
105 for sstable_info in &level.table_infos {
106 let key_range = &sstable_info.key_range;
107 let left_user_key = FullKey::decode(&key_range.left);
108 let right_user_key = FullKey::decode(&key_range.right);
109 if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
110 continue;
111 }
112 print_user_key_in_sst(sstable_store.clone(), sstable_info, target_key).await?;
113 }
114 }
115 }
116 Ok(())
117}
118
119async fn print_user_key_in_sst(
120 sstable_store: SstableStoreRef,
121 sst: &SstableInfo,
122 user_key: &UserKey<&[u8]>,
123) -> anyhow::Result<()> {
124 let mut dummy = StoreLocalStatistic::default();
126 let sst_metadata = sstable_store.sstable(sst, &mut dummy).await?;
127 dummy.ignore();
128 let data_path = sstable_store.get_sst_data_path(sst_metadata.id);
129 let mut is_first = true;
130 for block_meta in &sst_metadata.meta.block_metas {
131 let range =
132 block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
133 let block_data = sstable_store.store().read(&data_path, range).await?;
134 let block = Box::new(Block::decode(block_data, block_meta.uncompressed_size as _).unwrap());
135 let holder = BlockHolder::from_owned_block(block);
136 let mut block_iter = BlockIterator::new(holder);
137 block_iter.seek_to_first();
138 while block_iter.is_valid() {
139 let full_key = block_iter.key();
140 if full_key.user_key.cmp(user_key) != Ordering::Equal {
141 block_iter.next();
142 continue;
143 }
144 let full_val = block_iter.value();
145 let hummock_val = HummockValue::from_slice(full_val)?;
146 let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
147 let date_time = DateTime::<Utc>::from(epoch.as_system_time());
148 if is_first {
149 is_first = false;
150 println!("\t\tSST id: {}, object id: {}", sst.sst_id, sst.object_id);
151 }
152 println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len());
153 println!(
154 "\t\t value: {:?}, len={}",
155 hummock_val,
156 hummock_val.encoded_len()
157 );
158 println!(
159 "\t\t epoch: {} offset = {} ({})",
160 epoch,
161 full_key.epoch_with_gap.offset(),
162 date_time
163 );
164 println!();
165 block_iter.next();
166 }
167 }
168 Ok(())
169}
170
171pub async fn print_version_delta_in_archive(
172 context: &CtlContext,
173 archive_ids: impl IntoIterator<Item = HummockVersionId>,
174 data_dir: String,
175 sst_id: HummockSstableId,
176 use_new_object_prefix_strategy: bool,
177) -> anyhow::Result<()> {
178 let hummock_opts =
179 HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
180 let hummock = context.hummock_store(hummock_opts).await?;
181 let sstable_store = hummock.sstable_store();
182 let archive_object_store = sstable_store.store();
183 for archive_id in archive_ids.into_iter().sorted() {
184 println!("search archive {archive_id}");
185 let archive = get_archive(archive_id, &data_dir, archive_object_store.clone()).await?;
186 for delta in &archive.version_deltas {
187 let mut is_first = true;
188 for (cg_id, deltas) in &delta.group_deltas {
189 for d in &deltas.group_deltas {
190 let d = d.delta_type.as_ref().unwrap();
191 if match_delta(d, sst_id) {
192 if is_first {
193 is_first = false;
194 println!(
195 "delta: id {}, prev_id {}, trivial_move {}",
196 delta.id, delta.prev_id, delta.trivial_move
197 );
198 }
199 println!("compaction group id {cg_id}");
200 print_delta(d);
201 }
202 }
203 }
204 }
205 }
206 Ok(())
207}
208
209fn match_delta(delta: &DeltaType, sst_id: HummockSstableId) -> bool {
210 match delta {
211 DeltaType::GroupConstruct(_)
212 | DeltaType::GroupDestroy(_)
213 | DeltaType::GroupMerge(_)
214 | DeltaType::TruncateTables(_) => false,
215 DeltaType::IntraLevel(delta) => {
216 delta
217 .inserted_table_infos
218 .iter()
219 .any(|sst| sst.sst_id == sst_id)
220 || delta.removed_table_ids.contains(&sst_id)
221 }
222 DeltaType::NewL0SubLevel(delta) => delta
223 .inserted_table_infos
224 .iter()
225 .any(|sst| sst.sst_id == sst_id),
226 }
227}
228
229fn print_delta(delta: &DeltaType) {
230 println!("{:?}", delta);
231}