risingwave_ctl/cmd_impl/hummock/
sst_dump.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use chrono::DateTime;
20use chrono::offset::Utc;
21use clap::Args;
22use futures::TryStreamExt;
23use itertools::Itertools;
24use risingwave_common::types::ToText;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_frontend::TableCatalog;
30use risingwave_hummock_sdk::key::FullKey;
31use risingwave_hummock_sdk::level::Level;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{HummockObjectId, HummockSstableObjectId};
34use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
35use risingwave_pb::id::TableId;
36use risingwave_rpc_client::MetaClient;
37use risingwave_storage::hummock::value::HummockValue;
38use risingwave_storage::hummock::{
39    Block, BlockHolder, BlockIterator, CompressionAlgorithm, Sstable, SstableStore,
40};
41use risingwave_storage::monitor::StoreLocalStatistic;
42use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
43
44use crate::CtlContext;
45use crate::common::HummockServiceOpts;
46
47type TableData = HashMap<TableId, TableCatalog>;
48
49#[derive(Args, Debug)]
50pub struct SstDumpArgs {
51    #[clap(short, long = "object-id")]
52    object_id: Option<u64>,
53    #[clap(short, long = "block-id")]
54    block_id: Option<u64>,
55    #[clap(short = 'p', long = "print-entry")]
56    print_entry: bool,
57    #[clap(short = 'l', long = "print-level")]
58    print_level: bool,
59    #[clap(short = 't', long = "print-table")]
60    print_table: bool,
61    #[clap(short = 'v', long = "print-vnode-stats")]
62    print_vnode_stats: bool,
63    #[clap(short = 'd')]
64    data_dir: Option<String>,
65    #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
66    use_new_object_prefix_strategy: bool,
67}
68
69pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
70    println!("Start sst dump with args: {:?}", args);
71    let table_data = if args.print_entry && args.print_table {
72        let meta_client = context.meta_client().await?;
73        load_table_schemas(&meta_client).await?
74    } else {
75        TableData::default()
76    };
77    if args.print_level {
78        // Level information is retrieved from meta service
79        let hummock = context
80            .hummock_store(HummockServiceOpts::from_env(
81                args.data_dir.clone(),
82                args.use_new_object_prefix_strategy,
83            )?)
84            .await?;
85        let version = hummock.inner().get_pinned_version().clone();
86        let sstable_store = hummock.sstable_store();
87        for level in version.get_combined_levels() {
88            for sstable_info in &level.table_infos {
89                if let Some(object_id) = &args.object_id {
90                    if *object_id == sstable_info.object_id {
91                        print_level(level, sstable_info, &args);
92                        sst_dump_via_sstable_store(
93                            &sstable_store,
94                            sstable_info.object_id,
95                            sstable_info.meta_offset,
96                            sstable_info.file_size,
97                            &table_data,
98                            &args,
99                        )
100                        .await?;
101                        return Ok(());
102                    }
103                } else {
104                    print_level(level, sstable_info, &args);
105                    sst_dump_via_sstable_store(
106                        &sstable_store,
107                        sstable_info.object_id,
108                        sstable_info.meta_offset,
109                        sstable_info.file_size,
110                        &table_data,
111                        &args,
112                    )
113                    .await?;
114                }
115            }
116        }
117    } else {
118        // Object information is retrieved from object store. Meta service is not required.
119        let hummock_service_opts = HummockServiceOpts::from_env(
120            args.data_dir.clone(),
121            args.use_new_object_prefix_strategy,
122        )?;
123        let sstable_store = hummock_service_opts
124            .create_sstable_store(args.use_new_object_prefix_strategy)
125            .await?;
126        if let Some(obj_id) = &args.object_id {
127            let obj_store = sstable_store.store();
128            let obj_path = sstable_store.get_sst_data_path(*obj_id);
129            let obj = obj_store.metadata(&obj_path).await?;
130            print_object(&obj);
131            let meta_offset = get_meta_offset_from_object(&obj, obj_store.as_ref()).await?;
132            sst_dump_via_sstable_store(
133                &sstable_store,
134                (*obj_id).into(),
135                meta_offset,
136                obj.total_size as u64,
137                &table_data,
138                &args,
139            )
140            .await?;
141        } else {
142            let mut metadata_iter = sstable_store
143                .list_sst_object_metadata_from_object_store(None, None, None)
144                .await?;
145            while let Some(obj) = metadata_iter.try_next().await? {
146                print_object(&obj);
147                let obj_id = SstableStore::get_object_id_from_path(&obj.key);
148                let obj_id = match obj_id {
149                    HummockObjectId::Sstable(obj_id) => obj_id,
150                    HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => {
151                        println!(
152                            "object id {:?} not a sstable object id: {}. skip",
153                            obj_id, obj.key
154                        );
155                        continue;
156                    }
157                };
158                let meta_offset =
159                    get_meta_offset_from_object(&obj, sstable_store.store().as_ref()).await?;
160                sst_dump_via_sstable_store(
161                    &sstable_store,
162                    obj_id,
163                    meta_offset,
164                    obj.total_size as u64,
165                    &table_data,
166                    &args,
167                )
168                .await?;
169            }
170        }
171    }
172
173    Ok(())
174}
175
176fn print_level(level: &Level, sst_info: &SstableInfo, args: &SstDumpArgs) {
177    println!("Level Type: {}", level.level_type.as_str_name());
178    println!("Level Idx: {}", level.level_idx);
179    if level.level_idx == 0 {
180        println!("L0 Sub-Level Idx: {}", level.sub_level_id);
181    }
182    println!("SST id: {}", sst_info.sst_id);
183    println!("SST table_ids: {:?}", sst_info.table_ids);
184    if args.print_vnode_stats {
185        if let Some(ref stats) = sst_info.vnode_statistics {
186            println!("Vnode Statistics: {:?}", stats);
187        } else {
188            println!("Vnode Statistics: None");
189        }
190    }
191}
192
193fn print_object(obj: &ObjectMetadata) {
194    println!("Object Key: {}", obj.key);
195    println!("Object Size: {}", obj.total_size);
196    println!("Object Last Modified: {}", obj.last_modified);
197}
198
199async fn get_meta_offset_from_object(
200    obj: &ObjectMetadata,
201    obj_store: &ObjectStoreImpl,
202) -> anyhow::Result<u64> {
203    let start = obj.total_size
204        - (
205            // version, magic
206            2 * std::mem::size_of::<u32>() +
207        // footer, checksum
208        2 * std::mem::size_of::<u64>()
209        );
210    let end = start + std::mem::size_of::<u64>();
211    Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
212}
213
214pub async fn sst_dump_via_sstable_store(
215    sstable_store: &SstableStore,
216    object_id: HummockSstableObjectId,
217    meta_offset: u64,
218    file_size: u64,
219    table_data: &TableData,
220    args: &SstDumpArgs,
221) -> anyhow::Result<()> {
222    let sstable_info = SstableInfoInner {
223        object_id,
224        file_size,
225        meta_offset,
226        // below are default unused value
227        sst_id: 0.into(),
228        key_range: Default::default(),
229        table_ids: vec![],
230        stale_key_count: 0,
231        total_key_count: 0,
232        min_epoch: 0,
233        max_epoch: 0,
234        uncompressed_file_size: 0,
235        range_tombstone_count: 0,
236        bloom_filter_kind: Default::default(),
237        sst_size: 0,
238        vnode_statistics: None,
239    }
240    .into();
241    let sstable_cache = sstable_store
242        .sstable(&sstable_info, &mut StoreLocalStatistic::default())
243        .await?;
244    let sstable = sstable_cache.as_ref();
245    let sstable_meta = &sstable.meta;
246    let smallest_key = FullKey::decode(&sstable_meta.smallest_key);
247    let largest_key = FullKey::decode(&sstable_meta.largest_key);
248
249    println!("SST object id: {}", object_id);
250    println!("-------------------------------------");
251    println!("File Size: {}", sstable.estimate_size());
252
253    println!("Key Range:");
254    println!(
255        "\tleft:\t{:?}\n\tright:\t{:?}\n\t",
256        smallest_key, largest_key,
257    );
258
259    println!("Estimated Table Size: {}", sstable_meta.estimated_size);
260    println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
261    println!("Key Count: {}", sstable_meta.key_count);
262    println!("Version: {}", sstable_meta.version);
263
264    println!("Block Count: {}", sstable.block_count());
265    for i in 0..sstable.block_count() {
266        if let Some(block_id) = &args.block_id {
267            if *block_id == i as u64 {
268                print_block(i, table_data, sstable_store, sstable, args).await?;
269                return Ok(());
270            }
271        } else {
272            print_block(i, table_data, sstable_store, sstable, args).await?;
273        }
274    }
275    Ok(())
276}
277
278/// Determine all database tables and adds their information into a hash table with the table-ID as
279/// key.
280async fn load_table_schemas(meta_client: &MetaClient) -> anyhow::Result<TableData> {
281    let mut tables = HashMap::new();
282
283    let mvs = meta_client.risectl_list_state_tables().await?;
284    mvs.iter().for_each(|tbl| {
285        tables.insert(tbl.id, tbl.into());
286    });
287
288    Ok(tables)
289}
290
291/// Prints a block of a given SST including all contained KV-pairs.
292async fn print_block(
293    block_idx: usize,
294    table_data: &TableData,
295    sstable_store: &SstableStore,
296    sst: &Sstable,
297    args: &SstDumpArgs,
298) -> anyhow::Result<()> {
299    println!("\tBlock {}", block_idx);
300    println!("\t-----------");
301
302    let block_meta = &sst.meta.block_metas[block_idx];
303    let smallest_key = FullKey::decode(&block_meta.smallest_key);
304    let data_path = sstable_store.get_sst_data_path(sst.id);
305
306    // Retrieve encoded block data in bytes
307    let store = sstable_store.store();
308    let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
309    let block_data = store.read(&data_path, range).await?;
310
311    // Retrieve checksum and compression algorithm used from the encoded block data
312    let len = block_data.len();
313    let checksum = (&block_data[len - 8..]).get_u64_le();
314    let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?;
315
316    println!(
317        "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}, Smallest Key: {:?}",
318        block_meta.offset, block_meta.len, checksum, compression, smallest_key
319    );
320
321    if args.print_entry {
322        print_kv_pairs(
323            block_data,
324            table_data,
325            block_meta.uncompressed_size as usize,
326            args,
327        )?;
328    }
329
330    Ok(())
331}
332
333/// Prints the data of KV-Pairs of a given block out to the terminal.
334fn print_kv_pairs(
335    block_data: Bytes,
336    table_data: &TableData,
337    uncompressed_capacity: usize,
338    args: &SstDumpArgs,
339) -> anyhow::Result<()> {
340    println!("\tKV-Pairs:");
341
342    let block = Box::new(Block::decode(block_data, uncompressed_capacity).unwrap());
343    let holder = BlockHolder::from_owned_block(block);
344    let mut block_iter = BlockIterator::new(holder);
345    block_iter.seek_to_first();
346
347    while block_iter.is_valid() {
348        let full_key = block_iter.key();
349        let full_val = block_iter.value();
350        let humm_val = HummockValue::from_slice(full_val)?;
351
352        let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
353        let date_time = DateTime::<Utc>::from(epoch.as_system_time());
354
355        println!("\t\t   key: {:?}, len={}", full_key, full_key.encoded_len());
356        println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len());
357        println!(
358            "\t\t epoch: {} offset = {}  ({})",
359            epoch,
360            full_key.epoch_with_gap.offset(),
361            date_time
362        );
363        if args.print_table {
364            print_table_column(full_key, humm_val, table_data)?;
365        }
366
367        println!();
368
369        block_iter.next();
370    }
371
372    Ok(())
373}
374
375/// If possible, prints information about the table, column, and stored value.
376fn print_table_column(
377    full_key: FullKey<&[u8]>,
378    humm_val: HummockValue<&[u8]>,
379    table_data: &TableData,
380) -> anyhow::Result<()> {
381    let table_id = full_key.user_key.table_id;
382
383    print!("\t\t table: id={}, ", table_id);
384    let table_catalog = match table_data.get(&table_id) {
385        None => {
386            // Table may have been dropped.
387            println!("(unknown)");
388            return Ok(());
389        }
390        Some(table) => table,
391    };
392    println!(
393        "name={}, version={:?}",
394        table_catalog.name,
395        table_catalog.version()
396    );
397
398    if let Some(user_val) = humm_val.into_user_value() {
399        let column_desc = table_catalog
400            .value_indices
401            .iter()
402            .map(|idx| table_catalog.columns[*idx].column_desc.name.clone())
403            .collect_vec();
404
405        let row_deserializer: EitherSerde = if table_catalog.version().is_some() {
406            ColumnAwareSerde::new(
407                table_catalog.value_indices.clone().into(),
408                Arc::from_iter(
409                    table_catalog
410                        .columns()
411                        .iter()
412                        .cloned()
413                        .map(|c| c.column_desc),
414                ),
415            )
416            .into()
417        } else {
418            BasicSerde::new(
419                table_catalog.value_indices.clone().into(),
420                Arc::from_iter(
421                    table_catalog
422                        .columns()
423                        .iter()
424                        .cloned()
425                        .map(|c| c.column_desc),
426                ),
427            )
428            .into()
429        };
430        let row = row_deserializer.deserialize(user_val)?;
431        for (c, v) in column_desc.iter().zip_eq_fast(row.iter()) {
432            println!(
433                "\t\tcolumn: {} {:?}",
434                c,
435                v.as_ref().map(|v| v.as_scalar_ref_impl().to_text())
436            );
437        }
438    }
439
440    Ok(())
441}