1use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use chrono::DateTime;
20use chrono::offset::Utc;
21use clap::Args;
22use futures::TryStreamExt;
23use itertools::Itertools;
24use risingwave_common::types::ToText;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_frontend::TableCatalog;
30use risingwave_hummock_sdk::key::FullKey;
31use risingwave_hummock_sdk::level::Level;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{HummockObjectId, HummockSstableObjectId};
34use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
35use risingwave_rpc_client::MetaClient;
36use risingwave_storage::hummock::value::HummockValue;
37use risingwave_storage::hummock::{
38 Block, BlockHolder, BlockIterator, CompressionAlgorithm, Sstable, SstableStore,
39};
40use risingwave_storage::monitor::StoreLocalStatistic;
41use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
42
43use crate::CtlContext;
44use crate::common::HummockServiceOpts;
45
46type TableData = HashMap<u32, TableCatalog>;
47
48#[derive(Args, Debug)]
49pub struct SstDumpArgs {
50 #[clap(short, long = "object-id")]
51 object_id: Option<u64>,
52 #[clap(short, long = "block-id")]
53 block_id: Option<u64>,
54 #[clap(short = 'p', long = "print-entry")]
55 print_entry: bool,
56 #[clap(short = 'l', long = "print-level")]
57 print_level: bool,
58 #[clap(short = 't', long = "print-table")]
59 print_table: bool,
60 #[clap(short = 'd')]
61 data_dir: Option<String>,
62 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
63 use_new_object_prefix_strategy: bool,
64}
65
66pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
67 println!("Start sst dump with args: {:?}", args);
68 let table_data = if args.print_entry && args.print_table {
69 let meta_client = context.meta_client().await?;
70 load_table_schemas(&meta_client).await?
71 } else {
72 TableData::default()
73 };
74 if args.print_level {
75 let hummock = context
77 .hummock_store(HummockServiceOpts::from_env(
78 args.data_dir.clone(),
79 args.use_new_object_prefix_strategy,
80 )?)
81 .await?;
82 let version = hummock.inner().get_pinned_version().clone();
83 let sstable_store = hummock.sstable_store();
84 for level in version.get_combined_levels() {
85 for sstable_info in &level.table_infos {
86 if let Some(object_id) = &args.object_id {
87 if *object_id == sstable_info.object_id {
88 print_level(level, sstable_info);
89 sst_dump_via_sstable_store(
90 &sstable_store,
91 sstable_info.object_id,
92 sstable_info.meta_offset,
93 sstable_info.file_size,
94 &table_data,
95 &args,
96 )
97 .await?;
98 return Ok(());
99 }
100 } else {
101 print_level(level, sstable_info);
102 sst_dump_via_sstable_store(
103 &sstable_store,
104 sstable_info.object_id,
105 sstable_info.meta_offset,
106 sstable_info.file_size,
107 &table_data,
108 &args,
109 )
110 .await?;
111 }
112 }
113 }
114 } else {
115 let hummock_service_opts = HummockServiceOpts::from_env(
117 args.data_dir.clone(),
118 args.use_new_object_prefix_strategy,
119 )?;
120 let sstable_store = hummock_service_opts
121 .create_sstable_store(args.use_new_object_prefix_strategy)
122 .await?;
123 if let Some(obj_id) = &args.object_id {
124 let obj_store = sstable_store.store();
125 let obj_path = sstable_store.get_sst_data_path(*obj_id);
126 let obj = obj_store.metadata(&obj_path).await?;
127 print_object(&obj);
128 let meta_offset = get_meta_offset_from_object(&obj, obj_store.as_ref()).await?;
129 sst_dump_via_sstable_store(
130 &sstable_store,
131 (*obj_id).into(),
132 meta_offset,
133 obj.total_size as u64,
134 &table_data,
135 &args,
136 )
137 .await?;
138 } else {
139 let mut metadata_iter = sstable_store
140 .list_sst_object_metadata_from_object_store(None, None, None)
141 .await?;
142 while let Some(obj) = metadata_iter.try_next().await? {
143 print_object(&obj);
144 let obj_id = SstableStore::get_object_id_from_path(&obj.key);
145 let obj_id = match obj_id {
146 HummockObjectId::Sstable(obj_id) => obj_id,
147 HummockObjectId::VectorFile(_) => {
148 println!(
149 "object id {:?} not a sstable object id: {}. skip",
150 obj_id, obj.key
151 );
152 continue;
153 }
154 };
155 let meta_offset =
156 get_meta_offset_from_object(&obj, sstable_store.store().as_ref()).await?;
157 sst_dump_via_sstable_store(
158 &sstable_store,
159 obj_id,
160 meta_offset,
161 obj.total_size as u64,
162 &table_data,
163 &args,
164 )
165 .await?;
166 }
167 }
168 }
169
170 Ok(())
171}
172
173fn print_level(level: &Level, sst_info: &SstableInfo) {
174 println!("Level Type: {}", level.level_type.as_str_name());
175 println!("Level Idx: {}", level.level_idx);
176 if level.level_idx == 0 {
177 println!("L0 Sub-Level Idx: {}", level.sub_level_id);
178 }
179 println!("SST id: {}", sst_info.sst_id);
180 println!("SST table_ids: {:?}", sst_info.table_ids);
181}
182
183fn print_object(obj: &ObjectMetadata) {
184 println!("Object Key: {}", obj.key);
185 println!("Object Size: {}", obj.total_size);
186 println!("Object Last Modified: {}", obj.last_modified);
187}
188
189async fn get_meta_offset_from_object(
190 obj: &ObjectMetadata,
191 obj_store: &ObjectStoreImpl,
192) -> anyhow::Result<u64> {
193 let start = obj.total_size
194 - (
195 2 * std::mem::size_of::<u32>() +
197 2 * std::mem::size_of::<u64>()
199 );
200 let end = start + std::mem::size_of::<u64>();
201 Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
202}
203
204pub async fn sst_dump_via_sstable_store(
205 sstable_store: &SstableStore,
206 object_id: HummockSstableObjectId,
207 meta_offset: u64,
208 file_size: u64,
209 table_data: &TableData,
210 args: &SstDumpArgs,
211) -> anyhow::Result<()> {
212 let sstable_info = SstableInfoInner {
213 object_id,
214 file_size,
215 meta_offset,
216 sst_id: 0.into(),
218 key_range: Default::default(),
219 table_ids: vec![],
220 stale_key_count: 0,
221 total_key_count: 0,
222 min_epoch: 0,
223 max_epoch: 0,
224 uncompressed_file_size: 0,
225 range_tombstone_count: 0,
226 bloom_filter_kind: Default::default(),
227 sst_size: 0,
228 }
229 .into();
230 let sstable_cache = sstable_store
231 .sstable(&sstable_info, &mut StoreLocalStatistic::default())
232 .await?;
233 let sstable = sstable_cache.as_ref();
234 let sstable_meta = &sstable.meta;
235 let smallest_key = FullKey::decode(&sstable_meta.smallest_key);
236 let largest_key = FullKey::decode(&sstable_meta.largest_key);
237
238 println!("SST object id: {}", object_id);
239 println!("-------------------------------------");
240 println!("File Size: {}", sstable.estimate_size());
241
242 println!("Key Range:");
243 println!(
244 "\tleft:\t{:?}\n\tright:\t{:?}\n\t",
245 smallest_key, largest_key,
246 );
247
248 println!("Estimated Table Size: {}", sstable_meta.estimated_size);
249 println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
250 println!("Key Count: {}", sstable_meta.key_count);
251 println!("Version: {}", sstable_meta.version);
252
253 println!("Block Count: {}", sstable.block_count());
254 for i in 0..sstable.block_count() {
255 if let Some(block_id) = &args.block_id {
256 if *block_id == i as u64 {
257 print_block(i, table_data, sstable_store, sstable, args).await?;
258 return Ok(());
259 }
260 } else {
261 print_block(i, table_data, sstable_store, sstable, args).await?;
262 }
263 }
264 Ok(())
265}
266
267async fn load_table_schemas(meta_client: &MetaClient) -> anyhow::Result<TableData> {
270 let mut tables = HashMap::new();
271
272 let mvs = meta_client.risectl_list_state_tables().await?;
273 mvs.iter().for_each(|tbl| {
274 tables.insert(tbl.id, tbl.into());
275 });
276
277 Ok(tables)
278}
279
280async fn print_block(
282 block_idx: usize,
283 table_data: &TableData,
284 sstable_store: &SstableStore,
285 sst: &Sstable,
286 args: &SstDumpArgs,
287) -> anyhow::Result<()> {
288 println!("\tBlock {}", block_idx);
289 println!("\t-----------");
290
291 let block_meta = &sst.meta.block_metas[block_idx];
292 let smallest_key = FullKey::decode(&block_meta.smallest_key);
293 let data_path = sstable_store.get_sst_data_path(sst.id);
294
295 let store = sstable_store.store();
297 let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
298 let block_data = store.read(&data_path, range).await?;
299
300 let len = block_data.len();
302 let checksum = (&block_data[len - 8..]).get_u64_le();
303 let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?;
304
305 println!(
306 "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}, Smallest Key: {:?}",
307 block_meta.offset, block_meta.len, checksum, compression, smallest_key
308 );
309
310 if args.print_entry {
311 print_kv_pairs(
312 block_data,
313 table_data,
314 block_meta.uncompressed_size as usize,
315 args,
316 )?;
317 }
318
319 Ok(())
320}
321
322fn print_kv_pairs(
324 block_data: Bytes,
325 table_data: &TableData,
326 uncompressed_capacity: usize,
327 args: &SstDumpArgs,
328) -> anyhow::Result<()> {
329 println!("\tKV-Pairs:");
330
331 let block = Box::new(Block::decode(block_data, uncompressed_capacity).unwrap());
332 let holder = BlockHolder::from_owned_block(block);
333 let mut block_iter = BlockIterator::new(holder);
334 block_iter.seek_to_first();
335
336 while block_iter.is_valid() {
337 let full_key = block_iter.key();
338 let full_val = block_iter.value();
339 let humm_val = HummockValue::from_slice(full_val)?;
340
341 let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
342 let date_time = DateTime::<Utc>::from(epoch.as_system_time());
343
344 println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len());
345 println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len());
346 println!(
347 "\t\t epoch: {} offset = {} ({})",
348 epoch,
349 full_key.epoch_with_gap.offset(),
350 date_time
351 );
352 if args.print_table {
353 print_table_column(full_key, humm_val, table_data)?;
354 }
355
356 println!();
357
358 block_iter.next();
359 }
360
361 Ok(())
362}
363
364fn print_table_column(
366 full_key: FullKey<&[u8]>,
367 humm_val: HummockValue<&[u8]>,
368 table_data: &TableData,
369) -> anyhow::Result<()> {
370 let table_id = full_key.user_key.table_id.table_id();
371
372 print!("\t\t table: id={}, ", table_id);
373 let table_catalog = match table_data.get(&table_id) {
374 None => {
375 println!("(unknown)");
377 return Ok(());
378 }
379 Some(table) => table,
380 };
381 println!(
382 "name={}, version={:?}",
383 table_catalog.name,
384 table_catalog.version()
385 );
386
387 if let Some(user_val) = humm_val.into_user_value() {
388 let column_desc = table_catalog
389 .value_indices
390 .iter()
391 .map(|idx| table_catalog.columns[*idx].column_desc.name.clone())
392 .collect_vec();
393
394 let row_deserializer: EitherSerde = if table_catalog.version().is_some() {
395 ColumnAwareSerde::new(
396 table_catalog.value_indices.clone().into(),
397 Arc::from_iter(
398 table_catalog
399 .columns()
400 .iter()
401 .cloned()
402 .map(|c| c.column_desc),
403 ),
404 )
405 .into()
406 } else {
407 BasicSerde::new(
408 table_catalog.value_indices.clone().into(),
409 Arc::from_iter(
410 table_catalog
411 .columns()
412 .iter()
413 .cloned()
414 .map(|c| c.column_desc),
415 ),
416 )
417 .into()
418 };
419 let row = row_deserializer.deserialize(user_val)?;
420 for (c, v) in column_desc.iter().zip_eq_fast(row.iter()) {
421 println!(
422 "\t\tcolumn: {} {:?}",
423 c,
424 v.as_ref().map(|v| v.as_scalar_ref_impl().to_text())
425 );
426 }
427 }
428
429 Ok(())
430}