risingwave_ctl/cmd_impl/hummock/
validate_version.rs1use std::cmp::Ordering;
16
17use chrono::DateTime;
18use chrono::offset::Utc;
19use itertools::Itertools;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
22use risingwave_hummock_sdk::key::{FullKey, UserKey};
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
25use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId, version_archive_dir};
26use risingwave_object_store::object::ObjectStoreRef;
27use risingwave_pb::hummock::HummockVersionArchive;
28use risingwave_pb::hummock::group_delta::DeltaType;
29use risingwave_rpc_client::HummockMetaClient;
30use risingwave_storage::hummock::value::HummockValue;
31use risingwave_storage::hummock::{Block, BlockHolder, BlockIterator, SstableStoreRef};
32use risingwave_storage::monitor::StoreLocalStatistic;
33
34use crate::CtlContext;
35use crate::common::HummockServiceOpts;
36
37pub async fn validate_version(context: &CtlContext) -> anyhow::Result<()> {
38 let meta_client = context.meta_client().await?;
39 let version = meta_client.get_current_version().await?;
40 let result = hummock_version_ext::validate_version(&version);
41 if !result.is_empty() {
42 println!("Invalid HummockVersion. Violation lists:");
43 for s in result {
44 println!("{}", s);
45 }
46 }
47
48 Ok(())
49}
50
51async fn get_archive(
52 archive_id: HummockVersionId,
53 data_dir: &str,
54 archive_object_store: ObjectStoreRef,
55) -> anyhow::Result<HummockVersionArchive> {
56 use prost::Message;
57 let archive_dir = version_archive_dir(data_dir);
58 let archive_path = format!("{archive_dir}/{archive_id}");
59 let archive_bytes = archive_object_store.read(&archive_path, ..).await?;
60 let archive: HummockVersionArchive = HummockVersionArchive::decode(archive_bytes)?;
61 Ok(archive)
62}
63
64pub async fn print_user_key_in_archive(
65 context: &CtlContext,
66 archive_ids: impl IntoIterator<Item = HummockVersionId>,
67 data_dir: String,
68 user_key: String,
69 use_new_object_prefix_strategy: bool,
70) -> anyhow::Result<()> {
71 let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| {
72 panic!("cannot decode user key {} into raw bytes", user_key);
73 });
74 let user_key = UserKey::decode(&user_key_bytes);
75 println!("user key: {user_key:?}");
76
77 let hummock_opts =
78 HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
79 let hummock = context.hummock_store(hummock_opts).await?;
80 let sstable_store = hummock.sstable_store();
81 let archive_object_store = sstable_store.store();
82 for archive_id in archive_ids.into_iter().sorted() {
83 println!("search archive {archive_id}");
84 let archive = get_archive(archive_id, &data_dir, archive_object_store.clone()).await?;
85 let mut base_version =
86 HummockVersion::from_persisted_protobuf(archive.version.as_ref().unwrap());
87 print_user_key_in_version(sstable_store.clone(), &base_version, &user_key).await?;
88 for delta in &archive.version_deltas {
89 base_version.apply_version_delta(&HummockVersionDelta::from_persisted_protobuf(delta));
90 print_user_key_in_version(sstable_store.clone(), &base_version, &user_key).await?;
91 }
92 }
93 Ok(())
94}
95
96async fn print_user_key_in_version(
97 sstable_store: SstableStoreRef,
98 version: &HummockVersion,
99 target_key: &UserKey<&[u8]>,
100) -> anyhow::Result<()> {
101 println!("print key {:?} in version {}", target_key, version.id);
102 for cg in version.levels.values() {
103 for level in cg.l0.sub_levels.iter().rev().chain(cg.levels.iter()) {
104 for sstable_info in &level.table_infos {
105 let key_range = &sstable_info.key_range;
106 let left_user_key = FullKey::decode(&key_range.left);
107 let right_user_key = FullKey::decode(&key_range.right);
108 if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
109 continue;
110 }
111 print_user_key_in_sst(sstable_store.clone(), sstable_info, target_key).await?;
112 }
113 }
114 }
115 Ok(())
116}
117
118async fn print_user_key_in_sst(
119 sstable_store: SstableStoreRef,
120 sst: &SstableInfo,
121 user_key: &UserKey<&[u8]>,
122) -> anyhow::Result<()> {
123 let mut dummy = StoreLocalStatistic::default();
125 let sst_metadata = sstable_store.sstable(sst, &mut dummy).await?;
126 dummy.ignore();
127 let data_path = sstable_store.get_sst_data_path(sst_metadata.id);
128 let mut is_first = true;
129 for block_meta in &sst_metadata.meta.block_metas {
130 let range =
131 block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
132 let block_data = sstable_store.store().read(&data_path, range).await?;
133 let block = Box::new(Block::decode(block_data, block_meta.uncompressed_size as _).unwrap());
134 let holder = BlockHolder::from_owned_block(block);
135 let mut block_iter = BlockIterator::new(holder);
136 block_iter.seek_to_first();
137 while block_iter.is_valid() {
138 let full_key = block_iter.key();
139 if full_key.user_key.cmp(user_key) != Ordering::Equal {
140 block_iter.next();
141 continue;
142 }
143 let full_val = block_iter.value();
144 let hummock_val = HummockValue::from_slice(full_val)?;
145 let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
146 let date_time = DateTime::<Utc>::from(epoch.as_system_time());
147 if is_first {
148 is_first = false;
149 println!("\t\tSST id: {}, object id: {}", sst.sst_id, sst.object_id);
150 }
151 println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len());
152 println!(
153 "\t\t value: {:?}, len={}",
154 hummock_val,
155 hummock_val.encoded_len()
156 );
157 println!(
158 "\t\t epoch: {} offset = {} ({})",
159 epoch,
160 full_key.epoch_with_gap.offset(),
161 date_time
162 );
163 println!();
164 block_iter.next();
165 }
166 }
167 Ok(())
168}
169
170pub async fn print_version_delta_in_archive(
171 context: &CtlContext,
172 archive_ids: impl IntoIterator<Item = HummockVersionId>,
173 data_dir: String,
174 sst_id: HummockSstableObjectId,
175 use_new_object_prefix_strategy: bool,
176) -> anyhow::Result<()> {
177 let hummock_opts =
178 HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
179 let hummock = context.hummock_store(hummock_opts).await?;
180 let sstable_store = hummock.sstable_store();
181 let archive_object_store = sstable_store.store();
182 for archive_id in archive_ids.into_iter().sorted() {
183 println!("search archive {archive_id}");
184 let archive = get_archive(archive_id, &data_dir, archive_object_store.clone()).await?;
185 for delta in &archive.version_deltas {
186 let mut is_first = true;
187 for (cg_id, deltas) in &delta.group_deltas {
188 for d in &deltas.group_deltas {
189 let d = d.delta_type.as_ref().unwrap();
190 if match_delta(d, sst_id) {
191 if is_first {
192 is_first = false;
193 println!(
194 "delta: id {}, prev_id {}, trivial_move {}",
195 delta.id, delta.prev_id, delta.trivial_move
196 );
197 }
198 println!("compaction group id {cg_id}");
199 print_delta(d);
200 }
201 }
202 }
203 }
204 }
205 Ok(())
206}
207
208fn match_delta(delta: &DeltaType, sst_id: HummockSstableObjectId) -> bool {
209 match delta {
210 DeltaType::GroupConstruct(_) | DeltaType::GroupDestroy(_) | DeltaType::GroupMerge(_) => {
211 false
212 }
213 DeltaType::IntraLevel(delta) => {
214 delta
215 .inserted_table_infos
216 .iter()
217 .any(|sst| sst.sst_id == sst_id)
218 || delta.removed_table_ids.contains(&sst_id)
219 }
220 DeltaType::NewL0SubLevel(delta) => delta
221 .inserted_table_infos
222 .iter()
223 .any(|sst| sst.sst_id == sst_id),
224 }
225}
226
227fn print_delta(delta: &DeltaType) {
228 println!("{:?}", delta);
229}