Skip to main content

risingwave_ctl/cmd_impl/hummock/
sst_dump.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use chrono::DateTime;
20use chrono::offset::Utc;
21use clap::Args;
22use futures::TryStreamExt;
23use itertools::Itertools;
24use risingwave_common::types::ToText;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_frontend::TableCatalog;
30use risingwave_hummock_sdk::key::FullKey;
31use risingwave_hummock_sdk::level::Level;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{HummockObjectId, HummockSstableObjectId};
34use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
35use risingwave_pb::hummock::{PbSstableFilterLayout, PbSstableFilterType};
36use risingwave_pb::id::TableId;
37use risingwave_rpc_client::MetaClient;
38use risingwave_storage::hummock::value::HummockValue;
39use risingwave_storage::hummock::{
40    Block, BlockHolder, BlockIterator, CompressionAlgorithm, Sstable, SstableStore,
41};
42use risingwave_storage::monitor::StoreLocalStatistic;
43use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
44
45use crate::CtlContext;
46use crate::common::HummockServiceOpts;
47
48type TableData = HashMap<TableId, TableCatalog>;
49
50#[derive(Args, Debug)]
51pub struct SstDumpArgs {
52    #[clap(short, long = "object-id")]
53    object_id: Option<u64>,
54    #[clap(short, long = "block-id")]
55    block_id: Option<u64>,
56    #[clap(short = 'p', long = "print-entry")]
57    print_entry: bool,
58    #[clap(short = 'l', long = "print-level")]
59    print_level: bool,
60    #[clap(short = 't', long = "print-table")]
61    print_table: bool,
62    #[clap(short = 'v', long = "print-vnode-stats")]
63    print_vnode_stats: bool,
64    #[clap(short = 'd')]
65    data_dir: Option<String>,
66    #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
67    use_new_object_prefix_strategy: bool,
68}
69
70pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
71    println!("Start sst dump with args: {:?}", args);
72    let table_data = if args.print_entry && args.print_table {
73        let meta_client = context.meta_client().await?;
74        load_table_schemas(&meta_client).await?
75    } else {
76        TableData::default()
77    };
78    if args.print_level {
79        // Level information is retrieved from meta service
80        let hummock = context
81            .hummock_store(HummockServiceOpts::from_env(
82                args.data_dir.clone(),
83                args.use_new_object_prefix_strategy,
84            )?)
85            .await?;
86        let version = hummock.inner().get_pinned_version().clone();
87        let sstable_store = hummock.sstable_store();
88        for level in version.get_combined_levels() {
89            for sstable_info in &level.table_infos {
90                if let Some(object_id) = &args.object_id {
91                    if *object_id == sstable_info.object_id {
92                        print_level(level, sstable_info, &args);
93                        sst_dump_via_sstable_store(
94                            &sstable_store,
95                            sstable_info.object_id,
96                            sstable_info.meta_offset,
97                            sstable_info.file_size,
98                            &table_data,
99                            &args,
100                        )
101                        .await?;
102                        return Ok(());
103                    }
104                } else {
105                    print_level(level, sstable_info, &args);
106                    sst_dump_via_sstable_store(
107                        &sstable_store,
108                        sstable_info.object_id,
109                        sstable_info.meta_offset,
110                        sstable_info.file_size,
111                        &table_data,
112                        &args,
113                    )
114                    .await?;
115                }
116            }
117        }
118    } else {
119        // Object information is retrieved from object store. Meta service is not required.
120        let hummock_service_opts = HummockServiceOpts::from_env(
121            args.data_dir.clone(),
122            args.use_new_object_prefix_strategy,
123        )?;
124        let sstable_store = hummock_service_opts
125            .create_sstable_store(args.use_new_object_prefix_strategy)
126            .await?;
127        if let Some(obj_id) = &args.object_id {
128            let obj_store = sstable_store.store();
129            let obj_path = sstable_store.get_sst_data_path(*obj_id);
130            let obj = obj_store.metadata(&obj_path).await?;
131            print_object(&obj);
132            let meta_offset = get_meta_offset_from_object(&obj, obj_store.as_ref()).await?;
133            sst_dump_via_sstable_store(
134                &sstable_store,
135                (*obj_id).into(),
136                meta_offset,
137                obj.total_size as u64,
138                &table_data,
139                &args,
140            )
141            .await?;
142        } else {
143            let mut metadata_iter = sstable_store
144                .list_sst_object_metadata_from_object_store(None, None, None)
145                .await?;
146            while let Some(obj) = metadata_iter.try_next().await? {
147                print_object(&obj);
148                let obj_id = SstableStore::get_object_id_from_path(&obj.key);
149                let obj_id = match obj_id {
150                    HummockObjectId::Sstable(obj_id) => obj_id,
151                    HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => {
152                        println!(
153                            "object id {:?} not a sstable object id: {}. skip",
154                            obj_id, obj.key
155                        );
156                        continue;
157                    }
158                };
159                let meta_offset =
160                    get_meta_offset_from_object(&obj, sstable_store.store().as_ref()).await?;
161                sst_dump_via_sstable_store(
162                    &sstable_store,
163                    obj_id,
164                    meta_offset,
165                    obj.total_size as u64,
166                    &table_data,
167                    &args,
168                )
169                .await?;
170            }
171        }
172    }
173
174    Ok(())
175}
176
177fn print_level(level: &Level, sst_info: &SstableInfo, args: &SstDumpArgs) {
178    println!("Level Type: {}", level.level_type.as_str_name());
179    println!("Level Idx: {}", level.level_idx);
180    if level.level_idx == 0 {
181        println!("L0 Sub-Level Idx: {}", level.sub_level_id);
182    }
183    println!("SST id: {}", sst_info.sst_id);
184    println!("SST table_ids: {:?}", sst_info.table_ids);
185    if args.print_vnode_stats {
186        if let Some(ref stats) = sst_info.vnode_statistics {
187            println!("Vnode Statistics: {:?}", stats);
188        } else {
189            println!("Vnode Statistics: None");
190        }
191    }
192}
193
194fn print_object(obj: &ObjectMetadata) {
195    println!("Object Key: {}", obj.key);
196    println!("Object Size: {}", obj.total_size);
197    println!("Object Last Modified: {}", obj.last_modified);
198}
199
200async fn get_meta_offset_from_object(
201    obj: &ObjectMetadata,
202    obj_store: &ObjectStoreImpl,
203) -> anyhow::Result<u64> {
204    let start = obj.total_size
205        - (
206            // version, magic
207            2 * std::mem::size_of::<u32>() +
208        // footer, checksum
209        2 * std::mem::size_of::<u64>()
210        );
211    let end = start + std::mem::size_of::<u64>();
212    Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
213}
214
215pub async fn sst_dump_via_sstable_store(
216    sstable_store: &SstableStore,
217    object_id: HummockSstableObjectId,
218    meta_offset: u64,
219    file_size: u64,
220    table_data: &TableData,
221    args: &SstDumpArgs,
222) -> anyhow::Result<()> {
223    let sstable_info = SstableInfoInner {
224        object_id,
225        file_size,
226        meta_offset,
227        // below are default unused value
228        sst_id: 0.into(),
229        key_range: Default::default(),
230        table_ids: vec![],
231        stale_key_count: 0,
232        total_key_count: 0,
233        min_epoch: 0,
234        max_epoch: 0,
235        uncompressed_file_size: 0,
236        range_tombstone_count: 0,
237        filter_type: PbSstableFilterType::SstableFilterNone,
238        filter_layout: PbSstableFilterLayout::Unspecified,
239        sst_size: 0,
240        vnode_statistics: None,
241    }
242    .into();
243    let sstable_cache = sstable_store
244        .sstable(&sstable_info, &mut StoreLocalStatistic::default())
245        .await?;
246    let sstable = sstable_cache.as_ref();
247    let sstable_meta = &sstable.meta;
248    let smallest_key = FullKey::decode(&sstable_meta.smallest_key);
249    let largest_key = FullKey::decode(&sstable_meta.largest_key);
250
251    println!("SST object id: {}", object_id);
252    println!("-------------------------------------");
253    println!("File Size: {}", sstable_meta.estimated_size);
254
255    println!("Key Range:");
256    println!(
257        "\tleft:\t{:?}\n\tright:\t{:?}\n\t",
258        smallest_key, largest_key,
259    );
260
261    println!("Estimated Table Size: {}", sstable_meta.estimated_size);
262    println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
263    println!("Key Count: {}", sstable_meta.key_count);
264    println!("Version: {}", sstable_meta.version);
265
266    println!("Block Count: {}", sstable.block_count());
267    for i in 0..sstable.block_count() {
268        if let Some(block_id) = &args.block_id {
269            if *block_id == i as u64 {
270                print_block(i, table_data, sstable_store, sstable, args).await?;
271                return Ok(());
272            }
273        } else {
274            print_block(i, table_data, sstable_store, sstable, args).await?;
275        }
276    }
277    Ok(())
278}
279
280/// Determine all database tables and adds their information into a hash table with the table-ID as
281/// key.
282async fn load_table_schemas(meta_client: &MetaClient) -> anyhow::Result<TableData> {
283    let mut tables = HashMap::new();
284
285    let mvs = meta_client.risectl_list_state_tables().await?;
286    mvs.iter().for_each(|tbl| {
287        tables.insert(tbl.id, tbl.into());
288    });
289
290    Ok(tables)
291}
292
293/// Prints a block of a given SST including all contained KV-pairs.
294async fn print_block(
295    block_idx: usize,
296    table_data: &TableData,
297    sstable_store: &SstableStore,
298    sst: &Sstable,
299    args: &SstDumpArgs,
300) -> anyhow::Result<()> {
301    println!("\tBlock {}", block_idx);
302    println!("\t-----------");
303
304    let block_meta = &sst.meta.block_metas[block_idx];
305    let smallest_key = FullKey::decode(&block_meta.smallest_key);
306    let data_path = sstable_store.get_sst_data_path(sst.id);
307
308    // Retrieve encoded block data in bytes
309    let store = sstable_store.store();
310    let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
311    let block_data = store.read(&data_path, range).await?;
312
313    // Retrieve checksum and compression algorithm used from the encoded block data
314    let len = block_data.len();
315    let checksum = (&block_data[len - 8..]).get_u64_le();
316    let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?;
317
318    println!(
319        "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}, Smallest Key: {:?}",
320        block_meta.offset, block_meta.len, checksum, compression, smallest_key
321    );
322
323    if args.print_entry {
324        print_kv_pairs(
325            block_data,
326            table_data,
327            block_meta.uncompressed_size as usize,
328            args,
329        )?;
330    }
331
332    Ok(())
333}
334
335/// Prints the data of KV-Pairs of a given block out to the terminal.
336fn print_kv_pairs(
337    block_data: Bytes,
338    table_data: &TableData,
339    uncompressed_capacity: usize,
340    args: &SstDumpArgs,
341) -> anyhow::Result<()> {
342    println!("\tKV-Pairs:");
343
344    let block = Box::new(Block::decode(block_data, uncompressed_capacity).unwrap());
345    let holder = BlockHolder::from_owned_block(block);
346    let mut block_iter = BlockIterator::new(holder);
347    block_iter.seek_to_first();
348
349    while block_iter.is_valid() {
350        let full_key = block_iter.key();
351        let full_val = block_iter.value();
352        let humm_val = HummockValue::from_slice(full_val)?;
353
354        let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
355        let date_time = DateTime::<Utc>::from(epoch.as_system_time());
356
357        println!("\t\t   key: {:?}, len={}", full_key, full_key.encoded_len());
358        println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len());
359        println!(
360            "\t\t epoch: {} offset = {}  ({})",
361            epoch,
362            full_key.epoch_with_gap.offset(),
363            date_time
364        );
365        if args.print_table {
366            print_table_column(full_key, humm_val, table_data)?;
367        }
368
369        println!();
370
371        block_iter.next();
372    }
373
374    Ok(())
375}
376
377/// If possible, prints information about the table, column, and stored value.
378fn print_table_column(
379    full_key: FullKey<&[u8]>,
380    humm_val: HummockValue<&[u8]>,
381    table_data: &TableData,
382) -> anyhow::Result<()> {
383    let table_id = full_key.user_key.table_id;
384
385    print!("\t\t table: id={}, ", table_id);
386    let table_catalog = match table_data.get(&table_id) {
387        None => {
388            // Table may have been dropped.
389            println!("(unknown)");
390            return Ok(());
391        }
392        Some(table) => table,
393    };
394    println!(
395        "name={}, version={:?}",
396        table_catalog.name,
397        table_catalog.version()
398    );
399
400    if let Some(user_val) = humm_val.into_user_value() {
401        let column_desc = table_catalog
402            .value_indices
403            .iter()
404            .map(|idx| table_catalog.columns[*idx].column_desc.name.clone())
405            .collect_vec();
406
407        let row_deserializer: EitherSerde = if table_catalog.version().is_some() {
408            ColumnAwareSerde::new(
409                table_catalog.value_indices.clone().into(),
410                Arc::from_iter(
411                    table_catalog
412                        .columns()
413                        .iter()
414                        .cloned()
415                        .map(|c| c.column_desc),
416                ),
417            )
418            .into()
419        } else {
420            BasicSerde::new(
421                table_catalog.value_indices.clone().into(),
422                Arc::from_iter(
423                    table_catalog
424                        .columns()
425                        .iter()
426                        .cloned()
427                        .map(|c| c.column_desc),
428                ),
429            )
430            .into()
431        };
432        let row = row_deserializer.deserialize(user_val)?;
433        for (c, v) in column_desc.iter().zip_eq_fast(row.iter()) {
434            println!(
435                "\t\tcolumn: {} {:?}",
436                c,
437                v.as_ref().map(|v| v.as_scalar_ref_impl().to_text())
438            );
439        }
440    }
441
442    Ok(())
443}