risingwave_ctl/cmd_impl/table/
scan.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Result, anyhow};
16use futures::{StreamExt, pin_mut};
17use risingwave_common::bitmap::Bitmap;
18use risingwave_frontend::TableCatalog;
19use risingwave_hummock_sdk::HummockReadEpoch;
20use risingwave_pb::id::TableId;
21use risingwave_rpc_client::MetaClient;
22use risingwave_storage::StateStore;
23use risingwave_storage::hummock::HummockStorage;
24use risingwave_storage::monitor::MonitoredStateStore;
25use risingwave_storage::store::PrefetchOptions;
26use risingwave_storage::table::TableDistribution;
27use risingwave_storage::table::batch_table::BatchTable;
28use risingwave_stream::common::table::state_table::{StateTable, StateTableBuilder};
29
30use crate::CtlContext;
31use crate::common::HummockServiceOpts;
32
33pub async fn get_table_catalog(meta: MetaClient, mv_name: String) -> Result<TableCatalog> {
34    let mvs = meta.risectl_list_state_tables().await?;
35    let mv = mvs
36        .iter()
37        .find(|x| x.name == mv_name)
38        .ok_or_else(|| anyhow!("mv not found"))?
39        .clone();
40    Ok(TableCatalog::from(&mv))
41}
42
43pub async fn get_table_catalog_by_id(meta: MetaClient, table_id: TableId) -> Result<TableCatalog> {
44    let mvs = meta.risectl_list_state_tables().await?;
45    let mv = mvs
46        .iter()
47        .find(|x| x.id == table_id)
48        .ok_or_else(|| anyhow!("mv not found"))?
49        .clone();
50    Ok(TableCatalog::from(&mv))
51}
52
53pub fn print_table_catalog(table: &TableCatalog) {
54    println!("{:#?}", table);
55}
56
57// TODO: shall we work on `TableDesc` instead?
58pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -> StateTable<S> {
59    StateTableBuilder::new(
60        &table.to_internal_table_prost(),
61        hummock,
62        Some(
63            // scan all vnodes
64            TableDistribution::all(table.distribution_key().to_vec(), table.vnode_count())
65                .vnodes()
66                .clone(),
67        ),
68    )
69    .forbid_preload_all_rows()
70    .build()
71    .await
72}
73
74// TODO: shall we work on `TableDesc` instead?
75pub fn make_storage_table<S: StateStore>(
76    hummock: S,
77    table: &TableCatalog,
78) -> Result<BatchTable<S>> {
79    let output_columns_ids = table
80        .columns()
81        .iter()
82        .map(|x| x.column_desc.column_id)
83        .collect();
84    Ok(BatchTable::new_partial(
85        hummock,
86        output_columns_ids,
87        Some(Bitmap::ones(table.vnode_count()).into()),
88        &table.table_desc().try_to_protobuf()?,
89    ))
90}
91
92pub async fn scan(
93    context: &CtlContext,
94    mv_name: String,
95    data_dir: Option<String>,
96    use_new_object_prefix_strategy: bool,
97) -> Result<()> {
98    let meta_client = context.meta_client().await?;
99    let hummock = context
100        .hummock_store(HummockServiceOpts::from_env(
101            data_dir,
102            use_new_object_prefix_strategy,
103        )?)
104        .await?;
105    let table = get_table_catalog(meta_client, mv_name).await?;
106    do_scan(table, hummock).await
107}
108
109pub async fn scan_id(
110    context: &CtlContext,
111    table_id: TableId,
112    data_dir: Option<String>,
113    use_new_object_prefix_strategy: bool,
114) -> Result<()> {
115    let meta_client = context.meta_client().await?;
116    let hummock = context
117        .hummock_store(HummockServiceOpts::from_env(
118            data_dir,
119            use_new_object_prefix_strategy,
120        )?)
121        .await?;
122    let table = get_table_catalog_by_id(meta_client, table_id).await?;
123    do_scan(table, hummock).await
124}
125
126async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorage>) -> Result<()> {
127    print_table_catalog(&table);
128
129    println!("Rows:");
130    let read_epoch = hummock
131        .inner()
132        .get_pinned_version()
133        .table_committed_epoch(table.id);
134    let Some(read_epoch) = read_epoch else {
135        println!(
136            "table {} with id {} not exist in the latest version",
137            table.name, table.id
138        );
139        return Ok(());
140    };
141    let storage_table = make_storage_table(hummock, &table)?;
142    let stream = storage_table
143        .batch_iter(
144            HummockReadEpoch::Committed(read_epoch),
145            true,
146            PrefetchOptions::prefetch_for_large_range_scan(),
147        )
148        .await?;
149    pin_mut!(stream);
150    while let Some(item) = stream.next().await {
151        println!("{:?}", item?);
152    }
153    Ok(())
154}