1use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use chrono::DateTime;
20use chrono::offset::Utc;
21use clap::Args;
22use futures::TryStreamExt;
23use itertools::Itertools;
24use risingwave_common::types::ToText;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_frontend::TableCatalog;
30use risingwave_hummock_sdk::key::FullKey;
31use risingwave_hummock_sdk::level::Level;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{HummockObjectId, HummockSstableObjectId};
34use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
35use risingwave_pb::id::TableId;
36use risingwave_rpc_client::MetaClient;
37use risingwave_storage::hummock::value::HummockValue;
38use risingwave_storage::hummock::{
39 Block, BlockHolder, BlockIterator, CompressionAlgorithm, Sstable, SstableStore,
40};
41use risingwave_storage::monitor::StoreLocalStatistic;
42use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
43
44use crate::CtlContext;
45use crate::common::HummockServiceOpts;
46
47type TableData = HashMap<TableId, TableCatalog>;
48
49#[derive(Args, Debug)]
50pub struct SstDumpArgs {
51 #[clap(short, long = "object-id")]
52 object_id: Option<u64>,
53 #[clap(short, long = "block-id")]
54 block_id: Option<u64>,
55 #[clap(short = 'p', long = "print-entry")]
56 print_entry: bool,
57 #[clap(short = 'l', long = "print-level")]
58 print_level: bool,
59 #[clap(short = 't', long = "print-table")]
60 print_table: bool,
61 #[clap(short = 'd')]
62 data_dir: Option<String>,
63 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
64 use_new_object_prefix_strategy: bool,
65}
66
67pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
68 println!("Start sst dump with args: {:?}", args);
69 let table_data = if args.print_entry && args.print_table {
70 let meta_client = context.meta_client().await?;
71 load_table_schemas(&meta_client).await?
72 } else {
73 TableData::default()
74 };
75 if args.print_level {
76 let hummock = context
78 .hummock_store(HummockServiceOpts::from_env(
79 args.data_dir.clone(),
80 args.use_new_object_prefix_strategy,
81 )?)
82 .await?;
83 let version = hummock.inner().get_pinned_version().clone();
84 let sstable_store = hummock.sstable_store();
85 for level in version.get_combined_levels() {
86 for sstable_info in &level.table_infos {
87 if let Some(object_id) = &args.object_id {
88 if *object_id == sstable_info.object_id {
89 print_level(level, sstable_info);
90 sst_dump_via_sstable_store(
91 &sstable_store,
92 sstable_info.object_id,
93 sstable_info.meta_offset,
94 sstable_info.file_size,
95 &table_data,
96 &args,
97 )
98 .await?;
99 return Ok(());
100 }
101 } else {
102 print_level(level, sstable_info);
103 sst_dump_via_sstable_store(
104 &sstable_store,
105 sstable_info.object_id,
106 sstable_info.meta_offset,
107 sstable_info.file_size,
108 &table_data,
109 &args,
110 )
111 .await?;
112 }
113 }
114 }
115 } else {
116 let hummock_service_opts = HummockServiceOpts::from_env(
118 args.data_dir.clone(),
119 args.use_new_object_prefix_strategy,
120 )?;
121 let sstable_store = hummock_service_opts
122 .create_sstable_store(args.use_new_object_prefix_strategy)
123 .await?;
124 if let Some(obj_id) = &args.object_id {
125 let obj_store = sstable_store.store();
126 let obj_path = sstable_store.get_sst_data_path(*obj_id);
127 let obj = obj_store.metadata(&obj_path).await?;
128 print_object(&obj);
129 let meta_offset = get_meta_offset_from_object(&obj, obj_store.as_ref()).await?;
130 sst_dump_via_sstable_store(
131 &sstable_store,
132 (*obj_id).into(),
133 meta_offset,
134 obj.total_size as u64,
135 &table_data,
136 &args,
137 )
138 .await?;
139 } else {
140 let mut metadata_iter = sstable_store
141 .list_sst_object_metadata_from_object_store(None, None, None)
142 .await?;
143 while let Some(obj) = metadata_iter.try_next().await? {
144 print_object(&obj);
145 let obj_id = SstableStore::get_object_id_from_path(&obj.key);
146 let obj_id = match obj_id {
147 HummockObjectId::Sstable(obj_id) => obj_id,
148 HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => {
149 println!(
150 "object id {:?} not a sstable object id: {}. skip",
151 obj_id, obj.key
152 );
153 continue;
154 }
155 };
156 let meta_offset =
157 get_meta_offset_from_object(&obj, sstable_store.store().as_ref()).await?;
158 sst_dump_via_sstable_store(
159 &sstable_store,
160 obj_id,
161 meta_offset,
162 obj.total_size as u64,
163 &table_data,
164 &args,
165 )
166 .await?;
167 }
168 }
169 }
170
171 Ok(())
172}
173
174fn print_level(level: &Level, sst_info: &SstableInfo) {
175 println!("Level Type: {}", level.level_type.as_str_name());
176 println!("Level Idx: {}", level.level_idx);
177 if level.level_idx == 0 {
178 println!("L0 Sub-Level Idx: {}", level.sub_level_id);
179 }
180 println!("SST id: {}", sst_info.sst_id);
181 println!("SST table_ids: {:?}", sst_info.table_ids);
182}
183
184fn print_object(obj: &ObjectMetadata) {
185 println!("Object Key: {}", obj.key);
186 println!("Object Size: {}", obj.total_size);
187 println!("Object Last Modified: {}", obj.last_modified);
188}
189
190async fn get_meta_offset_from_object(
191 obj: &ObjectMetadata,
192 obj_store: &ObjectStoreImpl,
193) -> anyhow::Result<u64> {
194 let start = obj.total_size
195 - (
196 2 * std::mem::size_of::<u32>() +
198 2 * std::mem::size_of::<u64>()
200 );
201 let end = start + std::mem::size_of::<u64>();
202 Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
203}
204
205pub async fn sst_dump_via_sstable_store(
206 sstable_store: &SstableStore,
207 object_id: HummockSstableObjectId,
208 meta_offset: u64,
209 file_size: u64,
210 table_data: &TableData,
211 args: &SstDumpArgs,
212) -> anyhow::Result<()> {
213 let sstable_info = SstableInfoInner {
214 object_id,
215 file_size,
216 meta_offset,
217 sst_id: 0.into(),
219 key_range: Default::default(),
220 table_ids: vec![],
221 stale_key_count: 0,
222 total_key_count: 0,
223 min_epoch: 0,
224 max_epoch: 0,
225 uncompressed_file_size: 0,
226 range_tombstone_count: 0,
227 bloom_filter_kind: Default::default(),
228 sst_size: 0,
229 }
230 .into();
231 let sstable_cache = sstable_store
232 .sstable(&sstable_info, &mut StoreLocalStatistic::default())
233 .await?;
234 let sstable = sstable_cache.as_ref();
235 let sstable_meta = &sstable.meta;
236 let smallest_key = FullKey::decode(&sstable_meta.smallest_key);
237 let largest_key = FullKey::decode(&sstable_meta.largest_key);
238
239 println!("SST object id: {}", object_id);
240 println!("-------------------------------------");
241 println!("File Size: {}", sstable.estimate_size());
242
243 println!("Key Range:");
244 println!(
245 "\tleft:\t{:?}\n\tright:\t{:?}\n\t",
246 smallest_key, largest_key,
247 );
248
249 println!("Estimated Table Size: {}", sstable_meta.estimated_size);
250 println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
251 println!("Key Count: {}", sstable_meta.key_count);
252 println!("Version: {}", sstable_meta.version);
253
254 println!("Block Count: {}", sstable.block_count());
255 for i in 0..sstable.block_count() {
256 if let Some(block_id) = &args.block_id {
257 if *block_id == i as u64 {
258 print_block(i, table_data, sstable_store, sstable, args).await?;
259 return Ok(());
260 }
261 } else {
262 print_block(i, table_data, sstable_store, sstable, args).await?;
263 }
264 }
265 Ok(())
266}
267
268async fn load_table_schemas(meta_client: &MetaClient) -> anyhow::Result<TableData> {
271 let mut tables = HashMap::new();
272
273 let mvs = meta_client.risectl_list_state_tables().await?;
274 mvs.iter().for_each(|tbl| {
275 tables.insert(tbl.id, tbl.into());
276 });
277
278 Ok(tables)
279}
280
281async fn print_block(
283 block_idx: usize,
284 table_data: &TableData,
285 sstable_store: &SstableStore,
286 sst: &Sstable,
287 args: &SstDumpArgs,
288) -> anyhow::Result<()> {
289 println!("\tBlock {}", block_idx);
290 println!("\t-----------");
291
292 let block_meta = &sst.meta.block_metas[block_idx];
293 let smallest_key = FullKey::decode(&block_meta.smallest_key);
294 let data_path = sstable_store.get_sst_data_path(sst.id);
295
296 let store = sstable_store.store();
298 let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
299 let block_data = store.read(&data_path, range).await?;
300
301 let len = block_data.len();
303 let checksum = (&block_data[len - 8..]).get_u64_le();
304 let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?;
305
306 println!(
307 "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}, Smallest Key: {:?}",
308 block_meta.offset, block_meta.len, checksum, compression, smallest_key
309 );
310
311 if args.print_entry {
312 print_kv_pairs(
313 block_data,
314 table_data,
315 block_meta.uncompressed_size as usize,
316 args,
317 )?;
318 }
319
320 Ok(())
321}
322
323fn print_kv_pairs(
325 block_data: Bytes,
326 table_data: &TableData,
327 uncompressed_capacity: usize,
328 args: &SstDumpArgs,
329) -> anyhow::Result<()> {
330 println!("\tKV-Pairs:");
331
332 let block = Box::new(Block::decode(block_data, uncompressed_capacity).unwrap());
333 let holder = BlockHolder::from_owned_block(block);
334 let mut block_iter = BlockIterator::new(holder);
335 block_iter.seek_to_first();
336
337 while block_iter.is_valid() {
338 let full_key = block_iter.key();
339 let full_val = block_iter.value();
340 let humm_val = HummockValue::from_slice(full_val)?;
341
342 let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
343 let date_time = DateTime::<Utc>::from(epoch.as_system_time());
344
345 println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len());
346 println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len());
347 println!(
348 "\t\t epoch: {} offset = {} ({})",
349 epoch,
350 full_key.epoch_with_gap.offset(),
351 date_time
352 );
353 if args.print_table {
354 print_table_column(full_key, humm_val, table_data)?;
355 }
356
357 println!();
358
359 block_iter.next();
360 }
361
362 Ok(())
363}
364
365fn print_table_column(
367 full_key: FullKey<&[u8]>,
368 humm_val: HummockValue<&[u8]>,
369 table_data: &TableData,
370) -> anyhow::Result<()> {
371 let table_id = full_key.user_key.table_id;
372
373 print!("\t\t table: id={}, ", table_id);
374 let table_catalog = match table_data.get(&table_id) {
375 None => {
376 println!("(unknown)");
378 return Ok(());
379 }
380 Some(table) => table,
381 };
382 println!(
383 "name={}, version={:?}",
384 table_catalog.name,
385 table_catalog.version()
386 );
387
388 if let Some(user_val) = humm_val.into_user_value() {
389 let column_desc = table_catalog
390 .value_indices
391 .iter()
392 .map(|idx| table_catalog.columns[*idx].column_desc.name.clone())
393 .collect_vec();
394
395 let row_deserializer: EitherSerde = if table_catalog.version().is_some() {
396 ColumnAwareSerde::new(
397 table_catalog.value_indices.clone().into(),
398 Arc::from_iter(
399 table_catalog
400 .columns()
401 .iter()
402 .cloned()
403 .map(|c| c.column_desc),
404 ),
405 )
406 .into()
407 } else {
408 BasicSerde::new(
409 table_catalog.value_indices.clone().into(),
410 Arc::from_iter(
411 table_catalog
412 .columns()
413 .iter()
414 .cloned()
415 .map(|c| c.column_desc),
416 ),
417 )
418 .into()
419 };
420 let row = row_deserializer.deserialize(user_val)?;
421 for (c, v) in column_desc.iter().zip_eq_fast(row.iter()) {
422 println!(
423 "\t\tcolumn: {} {:?}",
424 c,
425 v.as_ref().map(|v| v.as_scalar_ref_impl().to_text())
426 );
427 }
428 }
429
430 Ok(())
431}