risingwave_ctl/cmd_impl/hummock/
sst_dump.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use chrono::DateTime;
20use chrono::offset::Utc;
21use clap::Args;
22use futures::TryStreamExt;
23use itertools::Itertools;
24use risingwave_common::types::ToText;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_frontend::TableCatalog;
30use risingwave_hummock_sdk::HummockSstableObjectId;
31use risingwave_hummock_sdk::key::FullKey;
32use risingwave_hummock_sdk::level::Level;
33use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
34use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
35use risingwave_rpc_client::MetaClient;
36use risingwave_storage::hummock::value::HummockValue;
37use risingwave_storage::hummock::{
38    Block, BlockHolder, BlockIterator, CompressionAlgorithm, Sstable, SstableStore,
39};
40use risingwave_storage::monitor::StoreLocalStatistic;
41use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
42
43use crate::CtlContext;
44use crate::common::HummockServiceOpts;
45
46type TableData = HashMap<u32, TableCatalog>;
47
48#[derive(Args, Debug)]
49pub struct SstDumpArgs {
50    #[clap(short, long = "object-id")]
51    object_id: Option<u64>,
52    #[clap(short, long = "block-id")]
53    block_id: Option<u64>,
54    #[clap(short = 'p', long = "print-entry")]
55    print_entry: bool,
56    #[clap(short = 'l', long = "print-level")]
57    print_level: bool,
58    #[clap(short = 't', long = "print-table")]
59    print_table: bool,
60    #[clap(short = 'd')]
61    data_dir: Option<String>,
62    #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
63    use_new_object_prefix_strategy: bool,
64}
65
66pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
67    println!("Start sst dump with args: {:?}", args);
68    let table_data = if args.print_entry && args.print_table {
69        let meta_client = context.meta_client().await?;
70        load_table_schemas(&meta_client).await?
71    } else {
72        TableData::default()
73    };
74    if args.print_level {
75        // Level information is retrieved from meta service
76        let hummock = context
77            .hummock_store(HummockServiceOpts::from_env(
78                args.data_dir.clone(),
79                args.use_new_object_prefix_strategy,
80            )?)
81            .await?;
82        let version = hummock.inner().get_pinned_version().clone();
83        let sstable_store = hummock.sstable_store();
84        for level in version.get_combined_levels() {
85            for sstable_info in &level.table_infos {
86                if let Some(object_id) = &args.object_id {
87                    if *object_id == sstable_info.object_id {
88                        print_level(level, sstable_info);
89                        sst_dump_via_sstable_store(
90                            &sstable_store,
91                            sstable_info.object_id,
92                            sstable_info.meta_offset,
93                            sstable_info.file_size,
94                            &table_data,
95                            &args,
96                        )
97                        .await?;
98                        return Ok(());
99                    }
100                } else {
101                    print_level(level, sstable_info);
102                    sst_dump_via_sstable_store(
103                        &sstable_store,
104                        sstable_info.object_id,
105                        sstable_info.meta_offset,
106                        sstable_info.file_size,
107                        &table_data,
108                        &args,
109                    )
110                    .await?;
111                }
112            }
113        }
114    } else {
115        // Object information is retrieved from object store. Meta service is not required.
116        let hummock_service_opts = HummockServiceOpts::from_env(
117            args.data_dir.clone(),
118            args.use_new_object_prefix_strategy,
119        )?;
120        let sstable_store = hummock_service_opts
121            .create_sstable_store(args.use_new_object_prefix_strategy)
122            .await?;
123        if let Some(obj_id) = &args.object_id {
124            let obj_store = sstable_store.store();
125            let obj_path = sstable_store.get_sst_data_path(*obj_id);
126            let obj = obj_store.metadata(&obj_path).await?;
127            print_object(&obj);
128            let meta_offset = get_meta_offset_from_object(&obj, obj_store.as_ref()).await?;
129            let obj_id = SstableStore::get_object_id_from_path(&obj.key);
130            sst_dump_via_sstable_store(
131                &sstable_store,
132                obj_id,
133                meta_offset,
134                obj.total_size as u64,
135                &table_data,
136                &args,
137            )
138            .await?;
139        } else {
140            let mut metadata_iter = sstable_store
141                .list_object_metadata_from_object_store(None, None, None)
142                .await?;
143            while let Some(obj) = metadata_iter.try_next().await? {
144                print_object(&obj);
145                let meta_offset =
146                    get_meta_offset_from_object(&obj, sstable_store.store().as_ref()).await?;
147                let obj_id = SstableStore::get_object_id_from_path(&obj.key);
148                sst_dump_via_sstable_store(
149                    &sstable_store,
150                    obj_id,
151                    meta_offset,
152                    obj.total_size as u64,
153                    &table_data,
154                    &args,
155                )
156                .await?;
157            }
158        }
159    }
160
161    Ok(())
162}
163
164fn print_level(level: &Level, sst_info: &SstableInfo) {
165    println!("Level Type: {}", level.level_type.as_str_name());
166    println!("Level Idx: {}", level.level_idx);
167    if level.level_idx == 0 {
168        println!("L0 Sub-Level Idx: {}", level.sub_level_id);
169    }
170    println!("SST id: {}", sst_info.sst_id);
171    println!("SST table_ids: {:?}", sst_info.table_ids);
172}
173
174fn print_object(obj: &ObjectMetadata) {
175    println!("Object Key: {}", obj.key);
176    println!("Object Size: {}", obj.total_size);
177    println!("Object Last Modified: {}", obj.last_modified);
178}
179
180async fn get_meta_offset_from_object(
181    obj: &ObjectMetadata,
182    obj_store: &ObjectStoreImpl,
183) -> anyhow::Result<u64> {
184    let start = obj.total_size
185        - (
186            // version, magic
187            2 * std::mem::size_of::<u32>() +
188        // footer, checksum
189        2 * std::mem::size_of::<u64>()
190        );
191    let end = start + std::mem::size_of::<u64>();
192    Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
193}
194
195pub async fn sst_dump_via_sstable_store(
196    sstable_store: &SstableStore,
197    object_id: HummockSstableObjectId,
198    meta_offset: u64,
199    file_size: u64,
200    table_data: &TableData,
201    args: &SstDumpArgs,
202) -> anyhow::Result<()> {
203    let sstable_info = SstableInfoInner {
204        object_id,
205        file_size,
206        meta_offset,
207        ..Default::default()
208    }
209    .into();
210    let sstable_cache = sstable_store
211        .sstable(&sstable_info, &mut StoreLocalStatistic::default())
212        .await?;
213    let sstable = sstable_cache.as_ref();
214    let sstable_meta = &sstable.meta;
215    let smallest_key = FullKey::decode(&sstable_meta.smallest_key);
216    let largest_key = FullKey::decode(&sstable_meta.largest_key);
217
218    println!("SST object id: {}", object_id);
219    println!("-------------------------------------");
220    println!("File Size: {}", sstable.estimate_size());
221
222    println!("Key Range:");
223    println!(
224        "\tleft:\t{:?}\n\tright:\t{:?}\n\t",
225        smallest_key, largest_key,
226    );
227
228    println!("Estimated Table Size: {}", sstable_meta.estimated_size);
229    println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
230    println!("Key Count: {}", sstable_meta.key_count);
231    println!("Version: {}", sstable_meta.version);
232
233    println!("Block Count: {}", sstable.block_count());
234    for i in 0..sstable.block_count() {
235        if let Some(block_id) = &args.block_id {
236            if *block_id == i as u64 {
237                print_block(i, table_data, sstable_store, sstable, args).await?;
238                return Ok(());
239            }
240        } else {
241            print_block(i, table_data, sstable_store, sstable, args).await?;
242        }
243    }
244    Ok(())
245}
246
247/// Determine all database tables and adds their information into a hash table with the table-ID as
248/// key.
249async fn load_table_schemas(meta_client: &MetaClient) -> anyhow::Result<TableData> {
250    let mut tables = HashMap::new();
251
252    let mvs = meta_client.risectl_list_state_tables().await?;
253    mvs.iter().for_each(|tbl| {
254        tables.insert(tbl.id, tbl.into());
255    });
256
257    Ok(tables)
258}
259
260/// Prints a block of a given SST including all contained KV-pairs.
261async fn print_block(
262    block_idx: usize,
263    table_data: &TableData,
264    sstable_store: &SstableStore,
265    sst: &Sstable,
266    args: &SstDumpArgs,
267) -> anyhow::Result<()> {
268    println!("\tBlock {}", block_idx);
269    println!("\t-----------");
270
271    let block_meta = &sst.meta.block_metas[block_idx];
272    let smallest_key = FullKey::decode(&block_meta.smallest_key);
273    let data_path = sstable_store.get_sst_data_path(sst.id);
274
275    // Retrieve encoded block data in bytes
276    let store = sstable_store.store();
277    let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
278    let block_data = store.read(&data_path, range).await?;
279
280    // Retrieve checksum and compression algorithm used from the encoded block data
281    let len = block_data.len();
282    let checksum = (&block_data[len - 8..]).get_u64_le();
283    let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?;
284
285    println!(
286        "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}, Smallest Key: {:?}",
287        block_meta.offset, block_meta.len, checksum, compression, smallest_key
288    );
289
290    if args.print_entry {
291        print_kv_pairs(
292            block_data,
293            table_data,
294            block_meta.uncompressed_size as usize,
295            args,
296        )?;
297    }
298
299    Ok(())
300}
301
302/// Prints the data of KV-Pairs of a given block out to the terminal.
303fn print_kv_pairs(
304    block_data: Bytes,
305    table_data: &TableData,
306    uncompressed_capacity: usize,
307    args: &SstDumpArgs,
308) -> anyhow::Result<()> {
309    println!("\tKV-Pairs:");
310
311    let block = Box::new(Block::decode(block_data, uncompressed_capacity).unwrap());
312    let holder = BlockHolder::from_owned_block(block);
313    let mut block_iter = BlockIterator::new(holder);
314    block_iter.seek_to_first();
315
316    while block_iter.is_valid() {
317        let full_key = block_iter.key();
318        let full_val = block_iter.value();
319        let humm_val = HummockValue::from_slice(full_val)?;
320
321        let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
322        let date_time = DateTime::<Utc>::from(epoch.as_system_time());
323
324        println!("\t\t   key: {:?}, len={}", full_key, full_key.encoded_len());
325        println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len());
326        println!(
327            "\t\t epoch: {} offset = {}  ({})",
328            epoch,
329            full_key.epoch_with_gap.offset(),
330            date_time
331        );
332        if args.print_table {
333            print_table_column(full_key, humm_val, table_data)?;
334        }
335
336        println!();
337
338        block_iter.next();
339    }
340
341    Ok(())
342}
343
344/// If possible, prints information about the table, column, and stored value.
345fn print_table_column(
346    full_key: FullKey<&[u8]>,
347    humm_val: HummockValue<&[u8]>,
348    table_data: &TableData,
349) -> anyhow::Result<()> {
350    let table_id = full_key.user_key.table_id.table_id();
351
352    print!("\t\t table: id={}, ", table_id);
353    let table_catalog = match table_data.get(&table_id) {
354        None => {
355            // Table may have been dropped.
356            println!("(unknown)");
357            return Ok(());
358        }
359        Some(table) => table,
360    };
361    println!(
362        "name={}, version={:?}",
363        table_catalog.name,
364        table_catalog.version()
365    );
366
367    if let Some(user_val) = humm_val.into_user_value() {
368        let column_desc = table_catalog
369            .value_indices
370            .iter()
371            .map(|idx| table_catalog.columns[*idx].column_desc.name.clone())
372            .collect_vec();
373
374        let row_deserializer: EitherSerde = if table_catalog.version().is_some() {
375            ColumnAwareSerde::new(
376                table_catalog.value_indices.clone().into(),
377                Arc::from_iter(
378                    table_catalog
379                        .columns()
380                        .iter()
381                        .cloned()
382                        .map(|c| c.column_desc),
383                ),
384            )
385            .into()
386        } else {
387            BasicSerde::new(
388                table_catalog.value_indices.clone().into(),
389                Arc::from_iter(
390                    table_catalog
391                        .columns()
392                        .iter()
393                        .cloned()
394                        .map(|c| c.column_desc),
395                ),
396            )
397            .into()
398        };
399        let row = row_deserializer.deserialize(user_val)?;
400        for (c, v) in column_desc.iter().zip_eq_fast(row.iter()) {
401            println!(
402                "\t\tcolumn: {} {:?}",
403                c,
404                v.as_ref().map(|v| v.as_scalar_ref_impl().to_text())
405            );
406        }
407    }
408
409    Ok(())
410}