risingwave_ctl/cmd_impl/hummock/
list_kv.rs1use core::ops::Bound::Unbounded;
16
17use risingwave_common::util::epoch::is_max_epoch;
18use risingwave_hummock_sdk::HummockReadEpoch;
19use risingwave_storage::StateStore;
20use risingwave_storage::hummock::CachePolicy;
21use risingwave_storage::store::{
22 NewReadSnapshotOptions, PrefetchOptions, ReadOptions, StateStoreIter, StateStoreRead,
23};
24
25use crate::CtlContext;
26use crate::common::HummockServiceOpts;
27
28pub async fn list_kv(
29 context: &CtlContext,
30 epoch: u64,
31 table_id: u32,
32 data_dir: Option<String>,
33 use_new_object_prefix_strategy: bool,
34) -> anyhow::Result<()> {
35 let hummock = context
36 .hummock_store(HummockServiceOpts::from_env(
37 data_dir,
38 use_new_object_prefix_strategy,
39 )?)
40 .await?;
41 if is_max_epoch(epoch) {
42 tracing::info!("using MAX EPOCH as epoch");
43 }
44 let range = (Unbounded, Unbounded);
45 let read_snapshot = hummock
46 .new_read_snapshot(
47 HummockReadEpoch::Committed(epoch),
48 NewReadSnapshotOptions {
49 table_id: table_id.into(),
50 },
51 )
52 .await?;
53 let mut scan_result = read_snapshot
54 .iter(
55 range,
56 ReadOptions {
57 prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
58 cache_policy: CachePolicy::NotFill,
59 ..Default::default()
60 },
61 )
62 .await?;
63 while let Some(item) = scan_result.try_next().await? {
64 let (k, v) = item;
65 let print_string = format!("[t{}]", k.user_key.table_id);
66 println!("{} {:?} => {:?}", print_string, k, v)
67 }
68 Ok(())
69}