risingwave_ctl/cmd_impl/table/
scan.rs1use anyhow::{Result, anyhow};
16use futures::{StreamExt, pin_mut};
17use risingwave_common::bitmap::Bitmap;
18use risingwave_frontend::TableCatalog;
19use risingwave_hummock_sdk::HummockReadEpoch;
20use risingwave_rpc_client::MetaClient;
21use risingwave_storage::StateStore;
22use risingwave_storage::hummock::HummockStorage;
23use risingwave_storage::monitor::MonitoredStateStore;
24use risingwave_storage::store::PrefetchOptions;
25use risingwave_storage::table::TableDistribution;
26use risingwave_storage::table::batch_table::BatchTable;
27use risingwave_stream::common::table::state_table::{StateTable, StateTableBuilder};
28
29use crate::CtlContext;
30use crate::common::HummockServiceOpts;
31
32pub async fn get_table_catalog(meta: MetaClient, mv_name: String) -> Result<TableCatalog> {
33 let mvs = meta.risectl_list_state_tables().await?;
34 let mv = mvs
35 .iter()
36 .find(|x| x.name == mv_name)
37 .ok_or_else(|| anyhow!("mv not found"))?
38 .clone();
39 Ok(TableCatalog::from(&mv))
40}
41
42pub async fn get_table_catalog_by_id(meta: MetaClient, table_id: u32) -> Result<TableCatalog> {
43 let mvs = meta.risectl_list_state_tables().await?;
44 let mv = mvs
45 .iter()
46 .find(|x| x.id == table_id)
47 .ok_or_else(|| anyhow!("mv not found"))?
48 .clone();
49 Ok(TableCatalog::from(&mv))
50}
51
52pub fn print_table_catalog(table: &TableCatalog) {
53 println!("{:#?}", table);
54}
55
56pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -> StateTable<S> {
58 StateTableBuilder::new(
59 &table.to_internal_table_prost(),
60 hummock,
61 Some(
62 TableDistribution::all(table.distribution_key().to_vec(), table.vnode_count())
64 .vnodes()
65 .clone(),
66 ),
67 )
68 .forbid_preload_all_rows()
69 .build()
70 .await
71}
72
73pub fn make_storage_table<S: StateStore>(
75 hummock: S,
76 table: &TableCatalog,
77) -> Result<BatchTable<S>> {
78 let output_columns_ids = table
79 .columns()
80 .iter()
81 .map(|x| x.column_desc.column_id)
82 .collect();
83 Ok(BatchTable::new_partial(
84 hummock,
85 output_columns_ids,
86 Some(Bitmap::ones(table.vnode_count()).into()),
87 &table.table_desc().try_to_protobuf()?,
88 ))
89}
90
91pub async fn scan(
92 context: &CtlContext,
93 mv_name: String,
94 data_dir: Option<String>,
95 use_new_object_prefix_strategy: bool,
96) -> Result<()> {
97 let meta_client = context.meta_client().await?;
98 let hummock = context
99 .hummock_store(HummockServiceOpts::from_env(
100 data_dir,
101 use_new_object_prefix_strategy,
102 )?)
103 .await?;
104 let table = get_table_catalog(meta_client, mv_name).await?;
105 do_scan(table, hummock).await
106}
107
108pub async fn scan_id(
109 context: &CtlContext,
110 table_id: u32,
111 data_dir: Option<String>,
112 use_new_object_prefix_strategy: bool,
113) -> Result<()> {
114 let meta_client = context.meta_client().await?;
115 let hummock = context
116 .hummock_store(HummockServiceOpts::from_env(
117 data_dir,
118 use_new_object_prefix_strategy,
119 )?)
120 .await?;
121 let table = get_table_catalog_by_id(meta_client, table_id).await?;
122 do_scan(table, hummock).await
123}
124
125async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorage>) -> Result<()> {
126 print_table_catalog(&table);
127
128 println!("Rows:");
129 let read_epoch = hummock
130 .inner()
131 .get_pinned_version()
132 .table_committed_epoch(table.id);
133 let Some(read_epoch) = read_epoch else {
134 println!(
135 "table {} with id {} not exist in the latest version",
136 table.name, table.id
137 );
138 return Ok(());
139 };
140 let storage_table = make_storage_table(hummock, &table)?;
141 let stream = storage_table
142 .batch_iter(
143 HummockReadEpoch::Committed(read_epoch),
144 true,
145 PrefetchOptions::prefetch_for_large_range_scan(),
146 )
147 .await?;
148 pin_mut!(stream);
149 while let Some(item) = stream.next().await {
150 println!("{:?}", item?);
151 }
152 Ok(())
153}