risingwave_ctl/cmd_impl/hummock/
list_kv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::ops::Bound::Unbounded;
16
17use risingwave_common::util::epoch::is_max_epoch;
18use risingwave_hummock_sdk::HummockReadEpoch;
19use risingwave_storage::StateStore;
20use risingwave_storage::hummock::CachePolicy;
21use risingwave_storage::store::{
22    NewReadSnapshotOptions, PrefetchOptions, ReadOptions, StateStoreIter, StateStoreRead,
23};
24
25use crate::CtlContext;
26use crate::common::HummockServiceOpts;
27
28pub async fn list_kv(
29    context: &CtlContext,
30    epoch: u64,
31    table_id: u32,
32    data_dir: Option<String>,
33    use_new_object_prefix_strategy: bool,
34) -> anyhow::Result<()> {
35    let hummock = context
36        .hummock_store(HummockServiceOpts::from_env(
37            data_dir,
38            use_new_object_prefix_strategy,
39        )?)
40        .await?;
41    if is_max_epoch(epoch) {
42        tracing::info!("using MAX EPOCH as epoch");
43    }
44    let range = (Unbounded, Unbounded);
45    let read_snapshot = hummock
46        .new_read_snapshot(
47            HummockReadEpoch::Committed(epoch),
48            NewReadSnapshotOptions {
49                table_id: table_id.into(),
50            },
51        )
52        .await?;
53    let mut scan_result = read_snapshot
54        .iter(
55            range,
56            ReadOptions {
57                prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
58                cache_policy: CachePolicy::NotFill,
59                ..Default::default()
60            },
61        )
62        .await?;
63    while let Some(item) = scan_result.try_next().await? {
64        let (k, v) = item;
65        let print_string = format!("[t{}]", k.user_key.table_id);
66        println!("{} {:?} => {:?}", print_string, k, v)
67    }
68    Ok(())
69}