1use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use chrono::DateTime;
20use chrono::offset::Utc;
21use clap::Args;
22use futures::TryStreamExt;
23use itertools::Itertools;
24use risingwave_common::types::ToText;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_frontend::TableCatalog;
30use risingwave_hummock_sdk::key::FullKey;
31use risingwave_hummock_sdk::level::Level;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{HummockObjectId, HummockSstableObjectId};
34use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
35use risingwave_pb::hummock::{PbSstableFilterLayout, PbSstableFilterType};
36use risingwave_pb::id::TableId;
37use risingwave_rpc_client::MetaClient;
38use risingwave_storage::hummock::value::HummockValue;
39use risingwave_storage::hummock::{
40 Block, BlockHolder, BlockIterator, CompressionAlgorithm, Sstable, SstableStore,
41};
42use risingwave_storage::monitor::StoreLocalStatistic;
43use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
44
45use crate::CtlContext;
46use crate::common::HummockServiceOpts;
47
48type TableData = HashMap<TableId, TableCatalog>;
49
50#[derive(Args, Debug)]
51pub struct SstDumpArgs {
52 #[clap(short, long = "object-id")]
53 object_id: Option<u64>,
54 #[clap(short, long = "block-id")]
55 block_id: Option<u64>,
56 #[clap(short = 'p', long = "print-entry")]
57 print_entry: bool,
58 #[clap(short = 'l', long = "print-level")]
59 print_level: bool,
60 #[clap(short = 't', long = "print-table")]
61 print_table: bool,
62 #[clap(short = 'v', long = "print-vnode-stats")]
63 print_vnode_stats: bool,
64 #[clap(short = 'd')]
65 data_dir: Option<String>,
66 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
67 use_new_object_prefix_strategy: bool,
68}
69
70pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
71 println!("Start sst dump with args: {:?}", args);
72 let table_data = if args.print_entry && args.print_table {
73 let meta_client = context.meta_client().await?;
74 load_table_schemas(&meta_client).await?
75 } else {
76 TableData::default()
77 };
78 if args.print_level {
79 let hummock = context
81 .hummock_store(HummockServiceOpts::from_env(
82 args.data_dir.clone(),
83 args.use_new_object_prefix_strategy,
84 )?)
85 .await?;
86 let version = hummock.inner().get_pinned_version().clone();
87 let sstable_store = hummock.sstable_store();
88 for level in version.get_combined_levels() {
89 for sstable_info in &level.table_infos {
90 if let Some(object_id) = &args.object_id {
91 if *object_id == sstable_info.object_id {
92 print_level(level, sstable_info, &args);
93 sst_dump_via_sstable_store(
94 &sstable_store,
95 sstable_info.object_id,
96 sstable_info.meta_offset,
97 sstable_info.file_size,
98 &table_data,
99 &args,
100 )
101 .await?;
102 return Ok(());
103 }
104 } else {
105 print_level(level, sstable_info, &args);
106 sst_dump_via_sstable_store(
107 &sstable_store,
108 sstable_info.object_id,
109 sstable_info.meta_offset,
110 sstable_info.file_size,
111 &table_data,
112 &args,
113 )
114 .await?;
115 }
116 }
117 }
118 } else {
119 let hummock_service_opts = HummockServiceOpts::from_env(
121 args.data_dir.clone(),
122 args.use_new_object_prefix_strategy,
123 )?;
124 let sstable_store = hummock_service_opts
125 .create_sstable_store(args.use_new_object_prefix_strategy)
126 .await?;
127 if let Some(obj_id) = &args.object_id {
128 let obj_store = sstable_store.store();
129 let obj_path = sstable_store.get_sst_data_path(*obj_id);
130 let obj = obj_store.metadata(&obj_path).await?;
131 print_object(&obj);
132 let meta_offset = get_meta_offset_from_object(&obj, obj_store.as_ref()).await?;
133 sst_dump_via_sstable_store(
134 &sstable_store,
135 (*obj_id).into(),
136 meta_offset,
137 obj.total_size as u64,
138 &table_data,
139 &args,
140 )
141 .await?;
142 } else {
143 let mut metadata_iter = sstable_store
144 .list_sst_object_metadata_from_object_store(None, None, None)
145 .await?;
146 while let Some(obj) = metadata_iter.try_next().await? {
147 print_object(&obj);
148 let obj_id = SstableStore::get_object_id_from_path(&obj.key);
149 let obj_id = match obj_id {
150 HummockObjectId::Sstable(obj_id) => obj_id,
151 HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => {
152 println!(
153 "object id {:?} not a sstable object id: {}. skip",
154 obj_id, obj.key
155 );
156 continue;
157 }
158 };
159 let meta_offset =
160 get_meta_offset_from_object(&obj, sstable_store.store().as_ref()).await?;
161 sst_dump_via_sstable_store(
162 &sstable_store,
163 obj_id,
164 meta_offset,
165 obj.total_size as u64,
166 &table_data,
167 &args,
168 )
169 .await?;
170 }
171 }
172 }
173
174 Ok(())
175}
176
177fn print_level(level: &Level, sst_info: &SstableInfo, args: &SstDumpArgs) {
178 println!("Level Type: {}", level.level_type.as_str_name());
179 println!("Level Idx: {}", level.level_idx);
180 if level.level_idx == 0 {
181 println!("L0 Sub-Level Idx: {}", level.sub_level_id);
182 }
183 println!("SST id: {}", sst_info.sst_id);
184 println!("SST table_ids: {:?}", sst_info.table_ids);
185 if args.print_vnode_stats {
186 if let Some(ref stats) = sst_info.vnode_statistics {
187 println!("Vnode Statistics: {:?}", stats);
188 } else {
189 println!("Vnode Statistics: None");
190 }
191 }
192}
193
194fn print_object(obj: &ObjectMetadata) {
195 println!("Object Key: {}", obj.key);
196 println!("Object Size: {}", obj.total_size);
197 println!("Object Last Modified: {}", obj.last_modified);
198}
199
200async fn get_meta_offset_from_object(
201 obj: &ObjectMetadata,
202 obj_store: &ObjectStoreImpl,
203) -> anyhow::Result<u64> {
204 let start = obj.total_size
205 - (
206 2 * std::mem::size_of::<u32>() +
208 2 * std::mem::size_of::<u64>()
210 );
211 let end = start + std::mem::size_of::<u64>();
212 Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
213}
214
215pub async fn sst_dump_via_sstable_store(
216 sstable_store: &SstableStore,
217 object_id: HummockSstableObjectId,
218 meta_offset: u64,
219 file_size: u64,
220 table_data: &TableData,
221 args: &SstDumpArgs,
222) -> anyhow::Result<()> {
223 let sstable_info = SstableInfoInner {
224 object_id,
225 file_size,
226 meta_offset,
227 sst_id: 0.into(),
229 key_range: Default::default(),
230 table_ids: vec![],
231 stale_key_count: 0,
232 total_key_count: 0,
233 min_epoch: 0,
234 max_epoch: 0,
235 uncompressed_file_size: 0,
236 range_tombstone_count: 0,
237 filter_type: PbSstableFilterType::SstableFilterNone,
238 filter_layout: PbSstableFilterLayout::Unspecified,
239 sst_size: 0,
240 vnode_statistics: None,
241 }
242 .into();
243 let sstable_cache = sstable_store
244 .sstable(&sstable_info, &mut StoreLocalStatistic::default())
245 .await?;
246 let sstable = sstable_cache.as_ref();
247 let sstable_meta = &sstable.meta;
248 let smallest_key = FullKey::decode(&sstable_meta.smallest_key);
249 let largest_key = FullKey::decode(&sstable_meta.largest_key);
250
251 println!("SST object id: {}", object_id);
252 println!("-------------------------------------");
253 println!("File Size: {}", sstable_meta.estimated_size);
254
255 println!("Key Range:");
256 println!(
257 "\tleft:\t{:?}\n\tright:\t{:?}\n\t",
258 smallest_key, largest_key,
259 );
260
261 println!("Estimated Table Size: {}", sstable_meta.estimated_size);
262 println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
263 println!("Key Count: {}", sstable_meta.key_count);
264 println!("Version: {}", sstable_meta.version);
265
266 println!("Block Count: {}", sstable.block_count());
267 for i in 0..sstable.block_count() {
268 if let Some(block_id) = &args.block_id {
269 if *block_id == i as u64 {
270 print_block(i, table_data, sstable_store, sstable, args).await?;
271 return Ok(());
272 }
273 } else {
274 print_block(i, table_data, sstable_store, sstable, args).await?;
275 }
276 }
277 Ok(())
278}
279
280async fn load_table_schemas(meta_client: &MetaClient) -> anyhow::Result<TableData> {
283 let mut tables = HashMap::new();
284
285 let mvs = meta_client.risectl_list_state_tables().await?;
286 mvs.iter().for_each(|tbl| {
287 tables.insert(tbl.id, tbl.into());
288 });
289
290 Ok(tables)
291}
292
293async fn print_block(
295 block_idx: usize,
296 table_data: &TableData,
297 sstable_store: &SstableStore,
298 sst: &Sstable,
299 args: &SstDumpArgs,
300) -> anyhow::Result<()> {
301 println!("\tBlock {}", block_idx);
302 println!("\t-----------");
303
304 let block_meta = &sst.meta.block_metas[block_idx];
305 let smallest_key = FullKey::decode(&block_meta.smallest_key);
306 let data_path = sstable_store.get_sst_data_path(sst.id);
307
308 let store = sstable_store.store();
310 let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
311 let block_data = store.read(&data_path, range).await?;
312
313 let len = block_data.len();
315 let checksum = (&block_data[len - 8..]).get_u64_le();
316 let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?;
317
318 println!(
319 "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}, Smallest Key: {:?}",
320 block_meta.offset, block_meta.len, checksum, compression, smallest_key
321 );
322
323 if args.print_entry {
324 print_kv_pairs(
325 block_data,
326 table_data,
327 block_meta.uncompressed_size as usize,
328 args,
329 )?;
330 }
331
332 Ok(())
333}
334
335fn print_kv_pairs(
337 block_data: Bytes,
338 table_data: &TableData,
339 uncompressed_capacity: usize,
340 args: &SstDumpArgs,
341) -> anyhow::Result<()> {
342 println!("\tKV-Pairs:");
343
344 let block = Box::new(Block::decode(block_data, uncompressed_capacity).unwrap());
345 let holder = BlockHolder::from_owned_block(block);
346 let mut block_iter = BlockIterator::new(holder);
347 block_iter.seek_to_first();
348
349 while block_iter.is_valid() {
350 let full_key = block_iter.key();
351 let full_val = block_iter.value();
352 let humm_val = HummockValue::from_slice(full_val)?;
353
354 let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
355 let date_time = DateTime::<Utc>::from(epoch.as_system_time());
356
357 println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len());
358 println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len());
359 println!(
360 "\t\t epoch: {} offset = {} ({})",
361 epoch,
362 full_key.epoch_with_gap.offset(),
363 date_time
364 );
365 if args.print_table {
366 print_table_column(full_key, humm_val, table_data)?;
367 }
368
369 println!();
370
371 block_iter.next();
372 }
373
374 Ok(())
375}
376
377fn print_table_column(
379 full_key: FullKey<&[u8]>,
380 humm_val: HummockValue<&[u8]>,
381 table_data: &TableData,
382) -> anyhow::Result<()> {
383 let table_id = full_key.user_key.table_id;
384
385 print!("\t\t table: id={}, ", table_id);
386 let table_catalog = match table_data.get(&table_id) {
387 None => {
388 println!("(unknown)");
390 return Ok(());
391 }
392 Some(table) => table,
393 };
394 println!(
395 "name={}, version={:?}",
396 table_catalog.name,
397 table_catalog.version()
398 );
399
400 if let Some(user_val) = humm_val.into_user_value() {
401 let column_desc = table_catalog
402 .value_indices
403 .iter()
404 .map(|idx| table_catalog.columns[*idx].column_desc.name.clone())
405 .collect_vec();
406
407 let row_deserializer: EitherSerde = if table_catalog.version().is_some() {
408 ColumnAwareSerde::new(
409 table_catalog.value_indices.clone().into(),
410 Arc::from_iter(
411 table_catalog
412 .columns()
413 .iter()
414 .cloned()
415 .map(|c| c.column_desc),
416 ),
417 )
418 .into()
419 } else {
420 BasicSerde::new(
421 table_catalog.value_indices.clone().into(),
422 Arc::from_iter(
423 table_catalog
424 .columns()
425 .iter()
426 .cloned()
427 .map(|c| c.column_desc),
428 ),
429 )
430 .into()
431 };
432 let row = row_deserializer.deserialize(user_val)?;
433 for (c, v) in column_desc.iter().zip_eq_fast(row.iter()) {
434 println!(
435 "\t\tcolumn: {} {:?}",
436 c,
437 v.as_ref().map(|v| v.as_scalar_ref_impl().to_text())
438 );
439 }
440 }
441
442 Ok(())
443}