risingwave_ctl/cmd_impl/table/
scan.rs1use anyhow::{Result, anyhow};
16use futures::{StreamExt, pin_mut};
17use risingwave_common::bitmap::Bitmap;
18use risingwave_frontend::TableCatalog;
19use risingwave_hummock_sdk::HummockReadEpoch;
20use risingwave_rpc_client::MetaClient;
21use risingwave_storage::StateStore;
22use risingwave_storage::hummock::HummockStorage;
23use risingwave_storage::monitor::MonitoredStateStore;
24use risingwave_storage::store::PrefetchOptions;
25use risingwave_storage::table::TableDistribution;
26use risingwave_storage::table::batch_table::BatchTable;
27use risingwave_stream::common::table::state_table::StateTable;
28
29use crate::CtlContext;
30use crate::common::HummockServiceOpts;
31
32pub async fn get_table_catalog(meta: MetaClient, mv_name: String) -> Result<TableCatalog> {
33 let mvs = meta.risectl_list_state_tables().await?;
34 let mv = mvs
35 .iter()
36 .find(|x| x.name == mv_name)
37 .ok_or_else(|| anyhow!("mv not found"))?
38 .clone();
39 Ok(TableCatalog::from(&mv))
40}
41
42pub async fn get_table_catalog_by_id(meta: MetaClient, table_id: u32) -> Result<TableCatalog> {
43 let mvs = meta.risectl_list_state_tables().await?;
44 let mv = mvs
45 .iter()
46 .find(|x| x.id == table_id)
47 .ok_or_else(|| anyhow!("mv not found"))?
48 .clone();
49 Ok(TableCatalog::from(&mv))
50}
51
52pub fn print_table_catalog(table: &TableCatalog) {
53 println!("{:#?}", table);
54}
55
56pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -> StateTable<S> {
58 StateTable::from_table_catalog(
59 &table.to_internal_table_prost(),
60 hummock,
61 Some(
62 TableDistribution::all(table.distribution_key().to_vec(), table.vnode_count())
64 .vnodes()
65 .clone(),
66 ),
67 )
68 .await
69}
70
71pub fn make_storage_table<S: StateStore>(
73 hummock: S,
74 table: &TableCatalog,
75) -> Result<BatchTable<S>> {
76 let output_columns_ids = table
77 .columns()
78 .iter()
79 .map(|x| x.column_desc.column_id)
80 .collect();
81 Ok(BatchTable::new_partial(
82 hummock,
83 output_columns_ids,
84 Some(Bitmap::ones(table.vnode_count()).into()),
85 &table.table_desc().try_to_protobuf()?,
86 ))
87}
88
89pub async fn scan(
90 context: &CtlContext,
91 mv_name: String,
92 data_dir: Option<String>,
93 use_new_object_prefix_strategy: bool,
94) -> Result<()> {
95 let meta_client = context.meta_client().await?;
96 let hummock = context
97 .hummock_store(HummockServiceOpts::from_env(
98 data_dir,
99 use_new_object_prefix_strategy,
100 )?)
101 .await?;
102 let table = get_table_catalog(meta_client, mv_name).await?;
103 do_scan(table, hummock).await
104}
105
106pub async fn scan_id(
107 context: &CtlContext,
108 table_id: u32,
109 data_dir: Option<String>,
110 use_new_object_prefix_strategy: bool,
111) -> Result<()> {
112 let meta_client = context.meta_client().await?;
113 let hummock = context
114 .hummock_store(HummockServiceOpts::from_env(
115 data_dir,
116 use_new_object_prefix_strategy,
117 )?)
118 .await?;
119 let table = get_table_catalog_by_id(meta_client, table_id).await?;
120 do_scan(table, hummock).await
121}
122
123async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorage>) -> Result<()> {
124 print_table_catalog(&table);
125
126 println!("Rows:");
127 let read_epoch = hummock
128 .inner()
129 .get_pinned_version()
130 .table_committed_epoch(table.id);
131 let Some(read_epoch) = read_epoch else {
132 println!(
133 "table {} with id {} not exist in the latest version",
134 table.name, table.id
135 );
136 return Ok(());
137 };
138 let storage_table = make_storage_table(hummock, &table)?;
139 let stream = storage_table
140 .batch_iter(
141 HummockReadEpoch::Committed(read_epoch),
142 true,
143 PrefetchOptions::prefetch_for_large_range_scan(),
144 )
145 .await?;
146 pin_mut!(stream);
147 while let Some(item) = stream.next().await {
148 println!("{:?}", item?);
149 }
150 Ok(())
151}