risingwave_ctl/cmd_impl/table/
scan.rs1use anyhow::{Result, anyhow};
16use futures::{StreamExt, pin_mut};
17use risingwave_common::bitmap::Bitmap;
18use risingwave_frontend::TableCatalog;
19use risingwave_hummock_sdk::HummockReadEpoch;
20use risingwave_pb::id::TableId;
21use risingwave_rpc_client::MetaClient;
22use risingwave_storage::StateStore;
23use risingwave_storage::hummock::HummockStorage;
24use risingwave_storage::monitor::MonitoredStateStore;
25use risingwave_storage::store::PrefetchOptions;
26use risingwave_storage::table::TableDistribution;
27use risingwave_storage::table::batch_table::BatchTable;
28use risingwave_stream::common::table::state_table::{StateTable, StateTableBuilder};
29
30use crate::CtlContext;
31use crate::common::HummockServiceOpts;
32
33pub async fn get_table_catalog(meta: MetaClient, mv_name: String) -> Result<TableCatalog> {
34 let mvs = meta.risectl_list_state_tables().await?;
35 let mv = mvs
36 .iter()
37 .find(|x| x.name == mv_name)
38 .ok_or_else(|| anyhow!("mv not found"))?
39 .clone();
40 Ok(TableCatalog::from(&mv))
41}
42
43pub async fn get_table_catalog_by_id(meta: MetaClient, table_id: TableId) -> Result<TableCatalog> {
44 let mvs = meta.risectl_list_state_tables().await?;
45 let mv = mvs
46 .iter()
47 .find(|x| x.id == table_id)
48 .ok_or_else(|| anyhow!("mv not found"))?
49 .clone();
50 Ok(TableCatalog::from(&mv))
51}
52
53pub fn print_table_catalog(table: &TableCatalog) {
54 println!("{:#?}", table);
55}
56
57pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -> StateTable<S> {
59 StateTableBuilder::new(
60 &table.to_internal_table_prost(),
61 hummock,
62 Some(
63 TableDistribution::all(table.distribution_key().to_vec(), table.vnode_count())
65 .vnodes()
66 .clone(),
67 ),
68 )
69 .forbid_preload_all_rows()
70 .build()
71 .await
72}
73
74pub fn make_storage_table<S: StateStore>(
76 hummock: S,
77 table: &TableCatalog,
78) -> Result<BatchTable<S>> {
79 let output_columns_ids = table
80 .columns()
81 .iter()
82 .map(|x| x.column_desc.column_id)
83 .collect();
84 Ok(BatchTable::new_partial(
85 hummock,
86 output_columns_ids,
87 Some(Bitmap::ones(table.vnode_count()).into()),
88 &table.table_desc().try_to_protobuf()?,
89 ))
90}
91
92pub async fn scan(
93 context: &CtlContext,
94 mv_name: String,
95 data_dir: Option<String>,
96 use_new_object_prefix_strategy: bool,
97) -> Result<()> {
98 let meta_client = context.meta_client().await?;
99 let hummock = context
100 .hummock_store(HummockServiceOpts::from_env(
101 data_dir,
102 use_new_object_prefix_strategy,
103 )?)
104 .await?;
105 let table = get_table_catalog(meta_client, mv_name).await?;
106 do_scan(table, hummock).await
107}
108
109pub async fn scan_id(
110 context: &CtlContext,
111 table_id: TableId,
112 data_dir: Option<String>,
113 use_new_object_prefix_strategy: bool,
114) -> Result<()> {
115 let meta_client = context.meta_client().await?;
116 let hummock = context
117 .hummock_store(HummockServiceOpts::from_env(
118 data_dir,
119 use_new_object_prefix_strategy,
120 )?)
121 .await?;
122 let table = get_table_catalog_by_id(meta_client, table_id).await?;
123 do_scan(table, hummock).await
124}
125
126async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorage>) -> Result<()> {
127 print_table_catalog(&table);
128
129 println!("Rows:");
130 let read_epoch = hummock
131 .inner()
132 .get_pinned_version()
133 .table_committed_epoch(table.id);
134 let Some(read_epoch) = read_epoch else {
135 println!(
136 "table {} with id {} not exist in the latest version",
137 table.name, table.id
138 );
139 return Ok(());
140 };
141 let storage_table = make_storage_table(hummock, &table)?;
142 let stream = storage_table
143 .batch_iter(
144 HummockReadEpoch::Committed(read_epoch),
145 true,
146 PrefetchOptions::prefetch_for_large_range_scan(),
147 )
148 .await?;
149 pin_mut!(stream);
150 while let Some(item) = stream.next().await {
151 println!("{:?}", item?);
152 }
153 Ok(())
154}