1use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use chrono::DateTime;
20use chrono::offset::Utc;
21use clap::Args;
22use futures::TryStreamExt;
23use itertools::Itertools;
24use risingwave_common::types::ToText;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_frontend::TableCatalog;
30use risingwave_hummock_sdk::key::FullKey;
31use risingwave_hummock_sdk::level::Level;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{HummockObjectId, HummockSstableObjectId};
34use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
35use risingwave_pb::id::TableId;
36use risingwave_rpc_client::MetaClient;
37use risingwave_storage::hummock::value::HummockValue;
38use risingwave_storage::hummock::{
39 Block, BlockHolder, BlockIterator, CompressionAlgorithm, Sstable, SstableStore,
40};
41use risingwave_storage::monitor::StoreLocalStatistic;
42use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
43
44use crate::CtlContext;
45use crate::common::HummockServiceOpts;
46
47type TableData = HashMap<TableId, TableCatalog>;
48
49#[derive(Args, Debug)]
50pub struct SstDumpArgs {
51 #[clap(short, long = "object-id")]
52 object_id: Option<u64>,
53 #[clap(short, long = "block-id")]
54 block_id: Option<u64>,
55 #[clap(short = 'p', long = "print-entry")]
56 print_entry: bool,
57 #[clap(short = 'l', long = "print-level")]
58 print_level: bool,
59 #[clap(short = 't', long = "print-table")]
60 print_table: bool,
61 #[clap(short = 'v', long = "print-vnode-stats")]
62 print_vnode_stats: bool,
63 #[clap(short = 'd')]
64 data_dir: Option<String>,
65 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
66 use_new_object_prefix_strategy: bool,
67}
68
69pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
70 println!("Start sst dump with args: {:?}", args);
71 let table_data = if args.print_entry && args.print_table {
72 let meta_client = context.meta_client().await?;
73 load_table_schemas(&meta_client).await?
74 } else {
75 TableData::default()
76 };
77 if args.print_level {
78 let hummock = context
80 .hummock_store(HummockServiceOpts::from_env(
81 args.data_dir.clone(),
82 args.use_new_object_prefix_strategy,
83 )?)
84 .await?;
85 let version = hummock.inner().get_pinned_version().clone();
86 let sstable_store = hummock.sstable_store();
87 for level in version.get_combined_levels() {
88 for sstable_info in &level.table_infos {
89 if let Some(object_id) = &args.object_id {
90 if *object_id == sstable_info.object_id {
91 print_level(level, sstable_info, &args);
92 sst_dump_via_sstable_store(
93 &sstable_store,
94 sstable_info.object_id,
95 sstable_info.meta_offset,
96 sstable_info.file_size,
97 &table_data,
98 &args,
99 )
100 .await?;
101 return Ok(());
102 }
103 } else {
104 print_level(level, sstable_info, &args);
105 sst_dump_via_sstable_store(
106 &sstable_store,
107 sstable_info.object_id,
108 sstable_info.meta_offset,
109 sstable_info.file_size,
110 &table_data,
111 &args,
112 )
113 .await?;
114 }
115 }
116 }
117 } else {
118 let hummock_service_opts = HummockServiceOpts::from_env(
120 args.data_dir.clone(),
121 args.use_new_object_prefix_strategy,
122 )?;
123 let sstable_store = hummock_service_opts
124 .create_sstable_store(args.use_new_object_prefix_strategy)
125 .await?;
126 if let Some(obj_id) = &args.object_id {
127 let obj_store = sstable_store.store();
128 let obj_path = sstable_store.get_sst_data_path(*obj_id);
129 let obj = obj_store.metadata(&obj_path).await?;
130 print_object(&obj);
131 let meta_offset = get_meta_offset_from_object(&obj, obj_store.as_ref()).await?;
132 sst_dump_via_sstable_store(
133 &sstable_store,
134 (*obj_id).into(),
135 meta_offset,
136 obj.total_size as u64,
137 &table_data,
138 &args,
139 )
140 .await?;
141 } else {
142 let mut metadata_iter = sstable_store
143 .list_sst_object_metadata_from_object_store(None, None, None)
144 .await?;
145 while let Some(obj) = metadata_iter.try_next().await? {
146 print_object(&obj);
147 let obj_id = SstableStore::get_object_id_from_path(&obj.key);
148 let obj_id = match obj_id {
149 HummockObjectId::Sstable(obj_id) => obj_id,
150 HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => {
151 println!(
152 "object id {:?} not a sstable object id: {}. skip",
153 obj_id, obj.key
154 );
155 continue;
156 }
157 };
158 let meta_offset =
159 get_meta_offset_from_object(&obj, sstable_store.store().as_ref()).await?;
160 sst_dump_via_sstable_store(
161 &sstable_store,
162 obj_id,
163 meta_offset,
164 obj.total_size as u64,
165 &table_data,
166 &args,
167 )
168 .await?;
169 }
170 }
171 }
172
173 Ok(())
174}
175
176fn print_level(level: &Level, sst_info: &SstableInfo, args: &SstDumpArgs) {
177 println!("Level Type: {}", level.level_type.as_str_name());
178 println!("Level Idx: {}", level.level_idx);
179 if level.level_idx == 0 {
180 println!("L0 Sub-Level Idx: {}", level.sub_level_id);
181 }
182 println!("SST id: {}", sst_info.sst_id);
183 println!("SST table_ids: {:?}", sst_info.table_ids);
184 if args.print_vnode_stats {
185 if let Some(ref stats) = sst_info.vnode_statistics {
186 println!("Vnode Statistics: {:?}", stats);
187 } else {
188 println!("Vnode Statistics: None");
189 }
190 }
191}
192
193fn print_object(obj: &ObjectMetadata) {
194 println!("Object Key: {}", obj.key);
195 println!("Object Size: {}", obj.total_size);
196 println!("Object Last Modified: {}", obj.last_modified);
197}
198
199async fn get_meta_offset_from_object(
200 obj: &ObjectMetadata,
201 obj_store: &ObjectStoreImpl,
202) -> anyhow::Result<u64> {
203 let start = obj.total_size
204 - (
205 2 * std::mem::size_of::<u32>() +
207 2 * std::mem::size_of::<u64>()
209 );
210 let end = start + std::mem::size_of::<u64>();
211 Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
212}
213
214pub async fn sst_dump_via_sstable_store(
215 sstable_store: &SstableStore,
216 object_id: HummockSstableObjectId,
217 meta_offset: u64,
218 file_size: u64,
219 table_data: &TableData,
220 args: &SstDumpArgs,
221) -> anyhow::Result<()> {
222 let sstable_info = SstableInfoInner {
223 object_id,
224 file_size,
225 meta_offset,
226 sst_id: 0.into(),
228 key_range: Default::default(),
229 table_ids: vec![],
230 stale_key_count: 0,
231 total_key_count: 0,
232 min_epoch: 0,
233 max_epoch: 0,
234 uncompressed_file_size: 0,
235 range_tombstone_count: 0,
236 bloom_filter_kind: Default::default(),
237 sst_size: 0,
238 vnode_statistics: None,
239 }
240 .into();
241 let sstable_cache = sstable_store
242 .sstable(&sstable_info, &mut StoreLocalStatistic::default())
243 .await?;
244 let sstable = sstable_cache.as_ref();
245 let sstable_meta = &sstable.meta;
246 let smallest_key = FullKey::decode(&sstable_meta.smallest_key);
247 let largest_key = FullKey::decode(&sstable_meta.largest_key);
248
249 println!("SST object id: {}", object_id);
250 println!("-------------------------------------");
251 println!("File Size: {}", sstable.estimate_size());
252
253 println!("Key Range:");
254 println!(
255 "\tleft:\t{:?}\n\tright:\t{:?}\n\t",
256 smallest_key, largest_key,
257 );
258
259 println!("Estimated Table Size: {}", sstable_meta.estimated_size);
260 println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
261 println!("Key Count: {}", sstable_meta.key_count);
262 println!("Version: {}", sstable_meta.version);
263
264 println!("Block Count: {}", sstable.block_count());
265 for i in 0..sstable.block_count() {
266 if let Some(block_id) = &args.block_id {
267 if *block_id == i as u64 {
268 print_block(i, table_data, sstable_store, sstable, args).await?;
269 return Ok(());
270 }
271 } else {
272 print_block(i, table_data, sstable_store, sstable, args).await?;
273 }
274 }
275 Ok(())
276}
277
278async fn load_table_schemas(meta_client: &MetaClient) -> anyhow::Result<TableData> {
281 let mut tables = HashMap::new();
282
283 let mvs = meta_client.risectl_list_state_tables().await?;
284 mvs.iter().for_each(|tbl| {
285 tables.insert(tbl.id, tbl.into());
286 });
287
288 Ok(tables)
289}
290
291async fn print_block(
293 block_idx: usize,
294 table_data: &TableData,
295 sstable_store: &SstableStore,
296 sst: &Sstable,
297 args: &SstDumpArgs,
298) -> anyhow::Result<()> {
299 println!("\tBlock {}", block_idx);
300 println!("\t-----------");
301
302 let block_meta = &sst.meta.block_metas[block_idx];
303 let smallest_key = FullKey::decode(&block_meta.smallest_key);
304 let data_path = sstable_store.get_sst_data_path(sst.id);
305
306 let store = sstable_store.store();
308 let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
309 let block_data = store.read(&data_path, range).await?;
310
311 let len = block_data.len();
313 let checksum = (&block_data[len - 8..]).get_u64_le();
314 let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?;
315
316 println!(
317 "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}, Smallest Key: {:?}",
318 block_meta.offset, block_meta.len, checksum, compression, smallest_key
319 );
320
321 if args.print_entry {
322 print_kv_pairs(
323 block_data,
324 table_data,
325 block_meta.uncompressed_size as usize,
326 args,
327 )?;
328 }
329
330 Ok(())
331}
332
333fn print_kv_pairs(
335 block_data: Bytes,
336 table_data: &TableData,
337 uncompressed_capacity: usize,
338 args: &SstDumpArgs,
339) -> anyhow::Result<()> {
340 println!("\tKV-Pairs:");
341
342 let block = Box::new(Block::decode(block_data, uncompressed_capacity).unwrap());
343 let holder = BlockHolder::from_owned_block(block);
344 let mut block_iter = BlockIterator::new(holder);
345 block_iter.seek_to_first();
346
347 while block_iter.is_valid() {
348 let full_key = block_iter.key();
349 let full_val = block_iter.value();
350 let humm_val = HummockValue::from_slice(full_val)?;
351
352 let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
353 let date_time = DateTime::<Utc>::from(epoch.as_system_time());
354
355 println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len());
356 println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len());
357 println!(
358 "\t\t epoch: {} offset = {} ({})",
359 epoch,
360 full_key.epoch_with_gap.offset(),
361 date_time
362 );
363 if args.print_table {
364 print_table_column(full_key, humm_val, table_data)?;
365 }
366
367 println!();
368
369 block_iter.next();
370 }
371
372 Ok(())
373}
374
375fn print_table_column(
377 full_key: FullKey<&[u8]>,
378 humm_val: HummockValue<&[u8]>,
379 table_data: &TableData,
380) -> anyhow::Result<()> {
381 let table_id = full_key.user_key.table_id;
382
383 print!("\t\t table: id={}, ", table_id);
384 let table_catalog = match table_data.get(&table_id) {
385 None => {
386 println!("(unknown)");
388 return Ok(());
389 }
390 Some(table) => table,
391 };
392 println!(
393 "name={}, version={:?}",
394 table_catalog.name,
395 table_catalog.version()
396 );
397
398 if let Some(user_val) = humm_val.into_user_value() {
399 let column_desc = table_catalog
400 .value_indices
401 .iter()
402 .map(|idx| table_catalog.columns[*idx].column_desc.name.clone())
403 .collect_vec();
404
405 let row_deserializer: EitherSerde = if table_catalog.version().is_some() {
406 ColumnAwareSerde::new(
407 table_catalog.value_indices.clone().into(),
408 Arc::from_iter(
409 table_catalog
410 .columns()
411 .iter()
412 .cloned()
413 .map(|c| c.column_desc),
414 ),
415 )
416 .into()
417 } else {
418 BasicSerde::new(
419 table_catalog.value_indices.clone().into(),
420 Arc::from_iter(
421 table_catalog
422 .columns()
423 .iter()
424 .cloned()
425 .map(|c| c.column_desc),
426 ),
427 )
428 .into()
429 };
430 let row = row_deserializer.deserialize(user_val)?;
431 for (c, v) in column_desc.iter().zip_eq_fast(row.iter()) {
432 println!(
433 "\t\tcolumn: {} {:?}",
434 c,
435 v.as_ref().map(|v| v.as_scalar_ref_impl().to_text())
436 );
437 }
438 }
439
440 Ok(())
441}