risingwave_ctl/cmd_impl/hummock/
sst_dump.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use chrono::DateTime;
20use chrono::offset::Utc;
21use clap::Args;
22use futures::TryStreamExt;
23use itertools::Itertools;
24use risingwave_common::types::ToText;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_frontend::TableCatalog;
30use risingwave_hummock_sdk::key::FullKey;
31use risingwave_hummock_sdk::level::Level;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{HummockObjectId, HummockSstableObjectId};
34use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
35use risingwave_pb::id::TableId;
36use risingwave_rpc_client::MetaClient;
37use risingwave_storage::hummock::value::HummockValue;
38use risingwave_storage::hummock::{
39    Block, BlockHolder, BlockIterator, CompressionAlgorithm, Sstable, SstableStore,
40};
41use risingwave_storage::monitor::StoreLocalStatistic;
42use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
43
44use crate::CtlContext;
45use crate::common::HummockServiceOpts;
46
47type TableData = HashMap<TableId, TableCatalog>;
48
49#[derive(Args, Debug)]
50pub struct SstDumpArgs {
51    #[clap(short, long = "object-id")]
52    object_id: Option<u64>,
53    #[clap(short, long = "block-id")]
54    block_id: Option<u64>,
55    #[clap(short = 'p', long = "print-entry")]
56    print_entry: bool,
57    #[clap(short = 'l', long = "print-level")]
58    print_level: bool,
59    #[clap(short = 't', long = "print-table")]
60    print_table: bool,
61    #[clap(short = 'd')]
62    data_dir: Option<String>,
63    #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
64    use_new_object_prefix_strategy: bool,
65}
66
67pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
68    println!("Start sst dump with args: {:?}", args);
69    let table_data = if args.print_entry && args.print_table {
70        let meta_client = context.meta_client().await?;
71        load_table_schemas(&meta_client).await?
72    } else {
73        TableData::default()
74    };
75    if args.print_level {
76        // Level information is retrieved from meta service
77        let hummock = context
78            .hummock_store(HummockServiceOpts::from_env(
79                args.data_dir.clone(),
80                args.use_new_object_prefix_strategy,
81            )?)
82            .await?;
83        let version = hummock.inner().get_pinned_version().clone();
84        let sstable_store = hummock.sstable_store();
85        for level in version.get_combined_levels() {
86            for sstable_info in &level.table_infos {
87                if let Some(object_id) = &args.object_id {
88                    if *object_id == sstable_info.object_id {
89                        print_level(level, sstable_info);
90                        sst_dump_via_sstable_store(
91                            &sstable_store,
92                            sstable_info.object_id,
93                            sstable_info.meta_offset,
94                            sstable_info.file_size,
95                            &table_data,
96                            &args,
97                        )
98                        .await?;
99                        return Ok(());
100                    }
101                } else {
102                    print_level(level, sstable_info);
103                    sst_dump_via_sstable_store(
104                        &sstable_store,
105                        sstable_info.object_id,
106                        sstable_info.meta_offset,
107                        sstable_info.file_size,
108                        &table_data,
109                        &args,
110                    )
111                    .await?;
112                }
113            }
114        }
115    } else {
116        // Object information is retrieved from object store. Meta service is not required.
117        let hummock_service_opts = HummockServiceOpts::from_env(
118            args.data_dir.clone(),
119            args.use_new_object_prefix_strategy,
120        )?;
121        let sstable_store = hummock_service_opts
122            .create_sstable_store(args.use_new_object_prefix_strategy)
123            .await?;
124        if let Some(obj_id) = &args.object_id {
125            let obj_store = sstable_store.store();
126            let obj_path = sstable_store.get_sst_data_path(*obj_id);
127            let obj = obj_store.metadata(&obj_path).await?;
128            print_object(&obj);
129            let meta_offset = get_meta_offset_from_object(&obj, obj_store.as_ref()).await?;
130            sst_dump_via_sstable_store(
131                &sstable_store,
132                (*obj_id).into(),
133                meta_offset,
134                obj.total_size as u64,
135                &table_data,
136                &args,
137            )
138            .await?;
139        } else {
140            let mut metadata_iter = sstable_store
141                .list_sst_object_metadata_from_object_store(None, None, None)
142                .await?;
143            while let Some(obj) = metadata_iter.try_next().await? {
144                print_object(&obj);
145                let obj_id = SstableStore::get_object_id_from_path(&obj.key);
146                let obj_id = match obj_id {
147                    HummockObjectId::Sstable(obj_id) => obj_id,
148                    HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => {
149                        println!(
150                            "object id {:?} not a sstable object id: {}. skip",
151                            obj_id, obj.key
152                        );
153                        continue;
154                    }
155                };
156                let meta_offset =
157                    get_meta_offset_from_object(&obj, sstable_store.store().as_ref()).await?;
158                sst_dump_via_sstable_store(
159                    &sstable_store,
160                    obj_id,
161                    meta_offset,
162                    obj.total_size as u64,
163                    &table_data,
164                    &args,
165                )
166                .await?;
167            }
168        }
169    }
170
171    Ok(())
172}
173
174fn print_level(level: &Level, sst_info: &SstableInfo) {
175    println!("Level Type: {}", level.level_type.as_str_name());
176    println!("Level Idx: {}", level.level_idx);
177    if level.level_idx == 0 {
178        println!("L0 Sub-Level Idx: {}", level.sub_level_id);
179    }
180    println!("SST id: {}", sst_info.sst_id);
181    println!("SST table_ids: {:?}", sst_info.table_ids);
182}
183
184fn print_object(obj: &ObjectMetadata) {
185    println!("Object Key: {}", obj.key);
186    println!("Object Size: {}", obj.total_size);
187    println!("Object Last Modified: {}", obj.last_modified);
188}
189
190async fn get_meta_offset_from_object(
191    obj: &ObjectMetadata,
192    obj_store: &ObjectStoreImpl,
193) -> anyhow::Result<u64> {
194    let start = obj.total_size
195        - (
196            // version, magic
197            2 * std::mem::size_of::<u32>() +
198        // footer, checksum
199        2 * std::mem::size_of::<u64>()
200        );
201    let end = start + std::mem::size_of::<u64>();
202    Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
203}
204
205pub async fn sst_dump_via_sstable_store(
206    sstable_store: &SstableStore,
207    object_id: HummockSstableObjectId,
208    meta_offset: u64,
209    file_size: u64,
210    table_data: &TableData,
211    args: &SstDumpArgs,
212) -> anyhow::Result<()> {
213    let sstable_info = SstableInfoInner {
214        object_id,
215        file_size,
216        meta_offset,
217        // below are default unused value
218        sst_id: 0.into(),
219        key_range: Default::default(),
220        table_ids: vec![],
221        stale_key_count: 0,
222        total_key_count: 0,
223        min_epoch: 0,
224        max_epoch: 0,
225        uncompressed_file_size: 0,
226        range_tombstone_count: 0,
227        bloom_filter_kind: Default::default(),
228        sst_size: 0,
229    }
230    .into();
231    let sstable_cache = sstable_store
232        .sstable(&sstable_info, &mut StoreLocalStatistic::default())
233        .await?;
234    let sstable = sstable_cache.as_ref();
235    let sstable_meta = &sstable.meta;
236    let smallest_key = FullKey::decode(&sstable_meta.smallest_key);
237    let largest_key = FullKey::decode(&sstable_meta.largest_key);
238
239    println!("SST object id: {}", object_id);
240    println!("-------------------------------------");
241    println!("File Size: {}", sstable.estimate_size());
242
243    println!("Key Range:");
244    println!(
245        "\tleft:\t{:?}\n\tright:\t{:?}\n\t",
246        smallest_key, largest_key,
247    );
248
249    println!("Estimated Table Size: {}", sstable_meta.estimated_size);
250    println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
251    println!("Key Count: {}", sstable_meta.key_count);
252    println!("Version: {}", sstable_meta.version);
253
254    println!("Block Count: {}", sstable.block_count());
255    for i in 0..sstable.block_count() {
256        if let Some(block_id) = &args.block_id {
257            if *block_id == i as u64 {
258                print_block(i, table_data, sstable_store, sstable, args).await?;
259                return Ok(());
260            }
261        } else {
262            print_block(i, table_data, sstable_store, sstable, args).await?;
263        }
264    }
265    Ok(())
266}
267
268/// Determine all database tables and adds their information into a hash table with the table-ID as
269/// key.
270async fn load_table_schemas(meta_client: &MetaClient) -> anyhow::Result<TableData> {
271    let mut tables = HashMap::new();
272
273    let mvs = meta_client.risectl_list_state_tables().await?;
274    mvs.iter().for_each(|tbl| {
275        tables.insert(tbl.id, tbl.into());
276    });
277
278    Ok(tables)
279}
280
281/// Prints a block of a given SST including all contained KV-pairs.
282async fn print_block(
283    block_idx: usize,
284    table_data: &TableData,
285    sstable_store: &SstableStore,
286    sst: &Sstable,
287    args: &SstDumpArgs,
288) -> anyhow::Result<()> {
289    println!("\tBlock {}", block_idx);
290    println!("\t-----------");
291
292    let block_meta = &sst.meta.block_metas[block_idx];
293    let smallest_key = FullKey::decode(&block_meta.smallest_key);
294    let data_path = sstable_store.get_sst_data_path(sst.id);
295
296    // Retrieve encoded block data in bytes
297    let store = sstable_store.store();
298    let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
299    let block_data = store.read(&data_path, range).await?;
300
301    // Retrieve checksum and compression algorithm used from the encoded block data
302    let len = block_data.len();
303    let checksum = (&block_data[len - 8..]).get_u64_le();
304    let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?;
305
306    println!(
307        "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}, Smallest Key: {:?}",
308        block_meta.offset, block_meta.len, checksum, compression, smallest_key
309    );
310
311    if args.print_entry {
312        print_kv_pairs(
313            block_data,
314            table_data,
315            block_meta.uncompressed_size as usize,
316            args,
317        )?;
318    }
319
320    Ok(())
321}
322
323/// Prints the data of KV-Pairs of a given block out to the terminal.
324fn print_kv_pairs(
325    block_data: Bytes,
326    table_data: &TableData,
327    uncompressed_capacity: usize,
328    args: &SstDumpArgs,
329) -> anyhow::Result<()> {
330    println!("\tKV-Pairs:");
331
332    let block = Box::new(Block::decode(block_data, uncompressed_capacity).unwrap());
333    let holder = BlockHolder::from_owned_block(block);
334    let mut block_iter = BlockIterator::new(holder);
335    block_iter.seek_to_first();
336
337    while block_iter.is_valid() {
338        let full_key = block_iter.key();
339        let full_val = block_iter.value();
340        let humm_val = HummockValue::from_slice(full_val)?;
341
342        let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
343        let date_time = DateTime::<Utc>::from(epoch.as_system_time());
344
345        println!("\t\t   key: {:?}, len={}", full_key, full_key.encoded_len());
346        println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len());
347        println!(
348            "\t\t epoch: {} offset = {}  ({})",
349            epoch,
350            full_key.epoch_with_gap.offset(),
351            date_time
352        );
353        if args.print_table {
354            print_table_column(full_key, humm_val, table_data)?;
355        }
356
357        println!();
358
359        block_iter.next();
360    }
361
362    Ok(())
363}
364
365/// If possible, prints information about the table, column, and stored value.
366fn print_table_column(
367    full_key: FullKey<&[u8]>,
368    humm_val: HummockValue<&[u8]>,
369    table_data: &TableData,
370) -> anyhow::Result<()> {
371    let table_id = full_key.user_key.table_id;
372
373    print!("\t\t table: id={}, ", table_id);
374    let table_catalog = match table_data.get(&table_id) {
375        None => {
376            // Table may have been dropped.
377            println!("(unknown)");
378            return Ok(());
379        }
380        Some(table) => table,
381    };
382    println!(
383        "name={}, version={:?}",
384        table_catalog.name,
385        table_catalog.version()
386    );
387
388    if let Some(user_val) = humm_val.into_user_value() {
389        let column_desc = table_catalog
390            .value_indices
391            .iter()
392            .map(|idx| table_catalog.columns[*idx].column_desc.name.clone())
393            .collect_vec();
394
395        let row_deserializer: EitherSerde = if table_catalog.version().is_some() {
396            ColumnAwareSerde::new(
397                table_catalog.value_indices.clone().into(),
398                Arc::from_iter(
399                    table_catalog
400                        .columns()
401                        .iter()
402                        .cloned()
403                        .map(|c| c.column_desc),
404                ),
405            )
406            .into()
407        } else {
408            BasicSerde::new(
409                table_catalog.value_indices.clone().into(),
410                Arc::from_iter(
411                    table_catalog
412                        .columns()
413                        .iter()
414                        .cloned()
415                        .map(|c| c.column_desc),
416                ),
417            )
418            .into()
419        };
420        let row = row_deserializer.deserialize(user_val)?;
421        for (c, v) in column_desc.iter().zip_eq_fast(row.iter()) {
422            println!(
423                "\t\tcolumn: {} {:?}",
424                c,
425                v.as_ref().map(|v| v.as_scalar_ref_impl().to_text())
426            );
427        }
428    }
429
430    Ok(())
431}