risingwave_ctl/cmd_impl/hummock/
validate_version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16
17use chrono::DateTime;
18use chrono::offset::Utc;
19use itertools::Itertools;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
22use risingwave_hummock_sdk::key::{FullKey, UserKey};
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
25use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId, version_archive_dir};
26use risingwave_object_store::object::ObjectStoreRef;
27use risingwave_pb::hummock::HummockVersionArchive;
28use risingwave_pb::hummock::group_delta::DeltaType;
29use risingwave_rpc_client::HummockMetaClient;
30use risingwave_storage::hummock::value::HummockValue;
31use risingwave_storage::hummock::{Block, BlockHolder, BlockIterator, SstableStoreRef};
32use risingwave_storage::monitor::StoreLocalStatistic;
33
34use crate::CtlContext;
35use crate::common::HummockServiceOpts;
36
37pub async fn validate_version(context: &CtlContext) -> anyhow::Result<()> {
38    let meta_client = context.meta_client().await?;
39    let version = meta_client.get_current_version().await?;
40    let result = hummock_version_ext::validate_version(&version);
41    if !result.is_empty() {
42        println!("Invalid HummockVersion. Violation lists:");
43        for s in result {
44            println!("{}", s);
45        }
46    }
47
48    Ok(())
49}
50
51async fn get_archive(
52    archive_id: HummockVersionId,
53    data_dir: &str,
54    archive_object_store: ObjectStoreRef,
55) -> anyhow::Result<HummockVersionArchive> {
56    use prost::Message;
57    let archive_dir = version_archive_dir(data_dir);
58    let archive_path = format!("{archive_dir}/{archive_id}");
59    let archive_bytes = archive_object_store.read(&archive_path, ..).await?;
60    let archive: HummockVersionArchive = HummockVersionArchive::decode(archive_bytes)?;
61    Ok(archive)
62}
63
64pub async fn print_user_key_in_archive(
65    context: &CtlContext,
66    archive_ids: impl IntoIterator<Item = HummockVersionId>,
67    data_dir: String,
68    user_key: String,
69    use_new_object_prefix_strategy: bool,
70) -> anyhow::Result<()> {
71    let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| {
72        panic!("cannot decode user key {} into raw bytes", user_key);
73    });
74    let user_key = UserKey::decode(&user_key_bytes);
75    println!("user key: {user_key:?}");
76
77    let hummock_opts =
78        HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
79    let hummock = context.hummock_store(hummock_opts).await?;
80    let sstable_store = hummock.sstable_store();
81    let archive_object_store = sstable_store.store();
82    for archive_id in archive_ids.into_iter().sorted() {
83        println!("search archive {archive_id}");
84        let archive = get_archive(archive_id, &data_dir, archive_object_store.clone()).await?;
85        let mut base_version =
86            HummockVersion::from_persisted_protobuf(archive.version.as_ref().unwrap());
87        print_user_key_in_version(sstable_store.clone(), &base_version, &user_key).await?;
88        for delta in &archive.version_deltas {
89            base_version.apply_version_delta(&HummockVersionDelta::from_persisted_protobuf(delta));
90            print_user_key_in_version(sstable_store.clone(), &base_version, &user_key).await?;
91        }
92    }
93    Ok(())
94}
95
96async fn print_user_key_in_version(
97    sstable_store: SstableStoreRef,
98    version: &HummockVersion,
99    target_key: &UserKey<&[u8]>,
100) -> anyhow::Result<()> {
101    println!("print key {:?} in version {}", target_key, version.id);
102    for cg in version.levels.values() {
103        for level in cg.l0.sub_levels.iter().rev().chain(cg.levels.iter()) {
104            for sstable_info in &level.table_infos {
105                let key_range = &sstable_info.key_range;
106                let left_user_key = FullKey::decode(&key_range.left);
107                let right_user_key = FullKey::decode(&key_range.right);
108                if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
109                    continue;
110                }
111                print_user_key_in_sst(sstable_store.clone(), sstable_info, target_key).await?;
112            }
113        }
114    }
115    Ok(())
116}
117
118async fn print_user_key_in_sst(
119    sstable_store: SstableStoreRef,
120    sst: &SstableInfo,
121    user_key: &UserKey<&[u8]>,
122) -> anyhow::Result<()> {
123    // The implementation is mostly the same as `sst_dump`, with additional filter by `user_key`.
124    let mut dummy = StoreLocalStatistic::default();
125    let sst_metadata = sstable_store.sstable(sst, &mut dummy).await?;
126    dummy.ignore();
127    let data_path = sstable_store.get_sst_data_path(sst_metadata.id);
128    let mut is_first = true;
129    for block_meta in &sst_metadata.meta.block_metas {
130        let range =
131            block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
132        let block_data = sstable_store.store().read(&data_path, range).await?;
133        let block = Box::new(Block::decode(block_data, block_meta.uncompressed_size as _).unwrap());
134        let holder = BlockHolder::from_owned_block(block);
135        let mut block_iter = BlockIterator::new(holder);
136        block_iter.seek_to_first();
137        while block_iter.is_valid() {
138            let full_key = block_iter.key();
139            if full_key.user_key.cmp(user_key) != Ordering::Equal {
140                block_iter.next();
141                continue;
142            }
143            let full_val = block_iter.value();
144            let hummock_val = HummockValue::from_slice(full_val)?;
145            let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
146            let date_time = DateTime::<Utc>::from(epoch.as_system_time());
147            if is_first {
148                is_first = false;
149                println!("\t\tSST id: {}, object id: {}", sst.sst_id, sst.object_id);
150            }
151            println!("\t\t   key: {:?}, len={}", full_key, full_key.encoded_len());
152            println!(
153                "\t\t value: {:?}, len={}",
154                hummock_val,
155                hummock_val.encoded_len()
156            );
157            println!(
158                "\t\t epoch: {} offset = {}  ({})",
159                epoch,
160                full_key.epoch_with_gap.offset(),
161                date_time
162            );
163            println!();
164            block_iter.next();
165        }
166    }
167    Ok(())
168}
169
170pub async fn print_version_delta_in_archive(
171    context: &CtlContext,
172    archive_ids: impl IntoIterator<Item = HummockVersionId>,
173    data_dir: String,
174    sst_id: HummockSstableObjectId,
175    use_new_object_prefix_strategy: bool,
176) -> anyhow::Result<()> {
177    let hummock_opts =
178        HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
179    let hummock = context.hummock_store(hummock_opts).await?;
180    let sstable_store = hummock.sstable_store();
181    let archive_object_store = sstable_store.store();
182    for archive_id in archive_ids.into_iter().sorted() {
183        println!("search archive {archive_id}");
184        let archive = get_archive(archive_id, &data_dir, archive_object_store.clone()).await?;
185        for delta in &archive.version_deltas {
186            let mut is_first = true;
187            for (cg_id, deltas) in &delta.group_deltas {
188                for d in &deltas.group_deltas {
189                    let d = d.delta_type.as_ref().unwrap();
190                    if match_delta(d, sst_id) {
191                        if is_first {
192                            is_first = false;
193                            println!(
194                                "delta: id {}, prev_id {}, trivial_move {}",
195                                delta.id, delta.prev_id, delta.trivial_move
196                            );
197                        }
198                        println!("compaction group id {cg_id}");
199                        print_delta(d);
200                    }
201                }
202            }
203        }
204    }
205    Ok(())
206}
207
208fn match_delta(delta: &DeltaType, sst_id: HummockSstableObjectId) -> bool {
209    match delta {
210        DeltaType::GroupConstruct(_) | DeltaType::GroupDestroy(_) | DeltaType::GroupMerge(_) => {
211            false
212        }
213        DeltaType::IntraLevel(delta) => {
214            delta
215                .inserted_table_infos
216                .iter()
217                .any(|sst| sst.sst_id == sst_id)
218                || delta.removed_table_ids.contains(&sst_id)
219        }
220        DeltaType::NewL0SubLevel(delta) => delta
221            .inserted_table_infos
222            .iter()
223            .any(|sst| sst.sst_id == sst_id),
224    }
225}
226
227fn print_delta(delta: &DeltaType) {
228    println!("{:?}", delta);
229}