risingwave_ctl/cmd_impl/table/
scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Result, anyhow};
16use futures::{StreamExt, pin_mut};
17use risingwave_common::bitmap::Bitmap;
18use risingwave_frontend::TableCatalog;
19use risingwave_hummock_sdk::HummockReadEpoch;
20use risingwave_rpc_client::MetaClient;
21use risingwave_storage::StateStore;
22use risingwave_storage::hummock::HummockStorage;
23use risingwave_storage::monitor::MonitoredStateStore;
24use risingwave_storage::store::PrefetchOptions;
25use risingwave_storage::table::TableDistribution;
26use risingwave_storage::table::batch_table::BatchTable;
27use risingwave_stream::common::table::state_table::StateTable;
28
29use crate::CtlContext;
30use crate::common::HummockServiceOpts;
31
32pub async fn get_table_catalog(meta: MetaClient, mv_name: String) -> Result<TableCatalog> {
33    let mvs = meta.risectl_list_state_tables().await?;
34    let mv = mvs
35        .iter()
36        .find(|x| x.name == mv_name)
37        .ok_or_else(|| anyhow!("mv not found"))?
38        .clone();
39    Ok(TableCatalog::from(&mv))
40}
41
42pub async fn get_table_catalog_by_id(meta: MetaClient, table_id: u32) -> Result<TableCatalog> {
43    let mvs = meta.risectl_list_state_tables().await?;
44    let mv = mvs
45        .iter()
46        .find(|x| x.id == table_id)
47        .ok_or_else(|| anyhow!("mv not found"))?
48        .clone();
49    Ok(TableCatalog::from(&mv))
50}
51
52pub fn print_table_catalog(table: &TableCatalog) {
53    println!("{:#?}", table);
54}
55
56// TODO: shall we work on `TableDesc` instead?
57pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -> StateTable<S> {
58    StateTable::from_table_catalog(
59        &table.to_internal_table_prost(),
60        hummock,
61        Some(
62            // scan all vnodes
63            TableDistribution::all(table.distribution_key().to_vec(), table.vnode_count())
64                .vnodes()
65                .clone(),
66        ),
67    )
68    .await
69}
70
71// TODO: shall we work on `TableDesc` instead?
72pub fn make_storage_table<S: StateStore>(
73    hummock: S,
74    table: &TableCatalog,
75) -> Result<BatchTable<S>> {
76    let output_columns_ids = table
77        .columns()
78        .iter()
79        .map(|x| x.column_desc.column_id)
80        .collect();
81    Ok(BatchTable::new_partial(
82        hummock,
83        output_columns_ids,
84        Some(Bitmap::ones(table.vnode_count()).into()),
85        &table.table_desc().try_to_protobuf()?,
86    ))
87}
88
89pub async fn scan(
90    context: &CtlContext,
91    mv_name: String,
92    data_dir: Option<String>,
93    use_new_object_prefix_strategy: bool,
94) -> Result<()> {
95    let meta_client = context.meta_client().await?;
96    let hummock = context
97        .hummock_store(HummockServiceOpts::from_env(
98            data_dir,
99            use_new_object_prefix_strategy,
100        )?)
101        .await?;
102    let table = get_table_catalog(meta_client, mv_name).await?;
103    do_scan(table, hummock).await
104}
105
106pub async fn scan_id(
107    context: &CtlContext,
108    table_id: u32,
109    data_dir: Option<String>,
110    use_new_object_prefix_strategy: bool,
111) -> Result<()> {
112    let meta_client = context.meta_client().await?;
113    let hummock = context
114        .hummock_store(HummockServiceOpts::from_env(
115            data_dir,
116            use_new_object_prefix_strategy,
117        )?)
118        .await?;
119    let table = get_table_catalog_by_id(meta_client, table_id).await?;
120    do_scan(table, hummock).await
121}
122
123async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorage>) -> Result<()> {
124    print_table_catalog(&table);
125
126    println!("Rows:");
127    let read_epoch = hummock
128        .inner()
129        .get_pinned_version()
130        .table_committed_epoch(table.id);
131    let Some(read_epoch) = read_epoch else {
132        println!(
133            "table {} with id {} not exist in the latest version",
134            table.name, table.id
135        );
136        return Ok(());
137    };
138    let storage_table = make_storage_table(hummock, &table)?;
139    let stream = storage_table
140        .batch_iter(
141            HummockReadEpoch::Committed(read_epoch),
142            true,
143            PrefetchOptions::prefetch_for_large_range_scan(),
144        )
145        .await?;
146    pin_mut!(stream);
147    while let Some(item) = stream.next().await {
148        println!("{:?}", item?);
149    }
150    Ok(())
151}