risingwave_ctl/cmd_impl/hummock/
list_kv.rs1use core::ops::Bound::Unbounded;
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::is_max_epoch;
19use risingwave_hummock_sdk::HummockReadEpoch;
20use risingwave_storage::StateStore;
21use risingwave_storage::hummock::CachePolicy;
22use risingwave_storage::store::{
23 NewReadSnapshotOptions, PrefetchOptions, ReadOptions, StateStoreIter, StateStoreRead,
24};
25
26use crate::CtlContext;
27use crate::common::HummockServiceOpts;
28
29pub async fn list_kv(
30 context: &CtlContext,
31 epoch: u64,
32 table_id: u32,
33 data_dir: Option<String>,
34 use_new_object_prefix_strategy: bool,
35) -> anyhow::Result<()> {
36 let hummock = context
37 .hummock_store(HummockServiceOpts::from_env(
38 data_dir,
39 use_new_object_prefix_strategy,
40 )?)
41 .await?;
42 if is_max_epoch(epoch) {
43 tracing::info!("using MAX EPOCH as epoch");
44 }
45 let range = (Unbounded, Unbounded);
46 let read_snapshot = hummock
47 .new_read_snapshot(
48 HummockReadEpoch::Committed(epoch),
49 NewReadSnapshotOptions {
50 table_id: TableId { table_id },
51 },
52 )
53 .await?;
54 let mut scan_result = read_snapshot
55 .iter(
56 range,
57 ReadOptions {
58 table_id: TableId { table_id },
59 prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
60 cache_policy: CachePolicy::NotFill,
61 ..Default::default()
62 },
63 )
64 .await?;
65 while let Some(item) = scan_result.try_next().await? {
66 let (k, v) = item;
67 let print_string = format!("[t{}]", k.user_key.table_id.table_id());
68 println!("{} {:?} => {:?}", print_string, k, v)
69 }
70 Ok(())
71}