risingwave_ctl/cmd_impl/table/
scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Result, anyhow};
16use futures::{StreamExt, pin_mut};
17use risingwave_common::bitmap::Bitmap;
18use risingwave_frontend::TableCatalog;
19use risingwave_hummock_sdk::HummockReadEpoch;
20use risingwave_rpc_client::MetaClient;
21use risingwave_storage::StateStore;
22use risingwave_storage::hummock::HummockStorage;
23use risingwave_storage::monitor::MonitoredStateStore;
24use risingwave_storage::store::PrefetchOptions;
25use risingwave_storage::table::TableDistribution;
26use risingwave_storage::table::batch_table::BatchTable;
27use risingwave_stream::common::table::state_table::{StateTable, StateTableBuilder};
28
29use crate::CtlContext;
30use crate::common::HummockServiceOpts;
31
32pub async fn get_table_catalog(meta: MetaClient, mv_name: String) -> Result<TableCatalog> {
33    let mvs = meta.risectl_list_state_tables().await?;
34    let mv = mvs
35        .iter()
36        .find(|x| x.name == mv_name)
37        .ok_or_else(|| anyhow!("mv not found"))?
38        .clone();
39    Ok(TableCatalog::from(&mv))
40}
41
42pub async fn get_table_catalog_by_id(meta: MetaClient, table_id: u32) -> Result<TableCatalog> {
43    let mvs = meta.risectl_list_state_tables().await?;
44    let mv = mvs
45        .iter()
46        .find(|x| x.id == table_id)
47        .ok_or_else(|| anyhow!("mv not found"))?
48        .clone();
49    Ok(TableCatalog::from(&mv))
50}
51
52pub fn print_table_catalog(table: &TableCatalog) {
53    println!("{:#?}", table);
54}
55
56// TODO: shall we work on `TableDesc` instead?
57pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -> StateTable<S> {
58    StateTableBuilder::new(
59        &table.to_internal_table_prost(),
60        hummock,
61        Some(
62            // scan all vnodes
63            TableDistribution::all(table.distribution_key().to_vec(), table.vnode_count())
64                .vnodes()
65                .clone(),
66        ),
67    )
68    .forbid_preload_all_rows()
69    .build()
70    .await
71}
72
73// TODO: shall we work on `TableDesc` instead?
74pub fn make_storage_table<S: StateStore>(
75    hummock: S,
76    table: &TableCatalog,
77) -> Result<BatchTable<S>> {
78    let output_columns_ids = table
79        .columns()
80        .iter()
81        .map(|x| x.column_desc.column_id)
82        .collect();
83    Ok(BatchTable::new_partial(
84        hummock,
85        output_columns_ids,
86        Some(Bitmap::ones(table.vnode_count()).into()),
87        &table.table_desc().try_to_protobuf()?,
88    ))
89}
90
91pub async fn scan(
92    context: &CtlContext,
93    mv_name: String,
94    data_dir: Option<String>,
95    use_new_object_prefix_strategy: bool,
96) -> Result<()> {
97    let meta_client = context.meta_client().await?;
98    let hummock = context
99        .hummock_store(HummockServiceOpts::from_env(
100            data_dir,
101            use_new_object_prefix_strategy,
102        )?)
103        .await?;
104    let table = get_table_catalog(meta_client, mv_name).await?;
105    do_scan(table, hummock).await
106}
107
108pub async fn scan_id(
109    context: &CtlContext,
110    table_id: u32,
111    data_dir: Option<String>,
112    use_new_object_prefix_strategy: bool,
113) -> Result<()> {
114    let meta_client = context.meta_client().await?;
115    let hummock = context
116        .hummock_store(HummockServiceOpts::from_env(
117            data_dir,
118            use_new_object_prefix_strategy,
119        )?)
120        .await?;
121    let table = get_table_catalog_by_id(meta_client, table_id).await?;
122    do_scan(table, hummock).await
123}
124
125async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorage>) -> Result<()> {
126    print_table_catalog(&table);
127
128    println!("Rows:");
129    let read_epoch = hummock
130        .inner()
131        .get_pinned_version()
132        .table_committed_epoch(table.id);
133    let Some(read_epoch) = read_epoch else {
134        println!(
135            "table {} with id {} not exist in the latest version",
136            table.name, table.id
137        );
138        return Ok(());
139    };
140    let storage_table = make_storage_table(hummock, &table)?;
141    let stream = storage_table
142        .batch_iter(
143            HummockReadEpoch::Committed(read_epoch),
144            true,
145            PrefetchOptions::prefetch_for_large_range_scan(),
146        )
147        .await?;
148    pin_mut!(stream);
149    while let Some(item) = stream.next().await {
150        println!("{:?}", item?);
151    }
152    Ok(())
153}