risingwave_frontend/optimizer/plan_node/generic/
sys_scan.rs1use std::collections::HashMap;
16use std::rc::Rc;
17
18use educe::Educe;
19use pretty_xmlish::Pretty;
20use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
21use risingwave_common::util::column_index_mapping::ColIndexMapping;
22use risingwave_common::util::sort_util::ColumnOrder;
23
24use super::GenericPlanNode;
25use crate::catalog::ColumnId;
26use crate::expr::{ExprRewriter, ExprVisitor};
27use crate::optimizer::optimizer_context::OptimizerContextRef;
28use crate::optimizer::property::{Cardinality, FunctionalDependencySet, Order};
29use crate::utils::{ColIndexMappingRewriteExt, Condition};
30
31#[derive(Debug, Clone, Educe)]
33#[educe(PartialEq, Eq, Hash)]
34pub struct SysScan {
35 pub table_name: String,
36 pub required_col_idx: Vec<usize>,
38 pub output_col_idx: Vec<usize>,
39 pub table_desc: Rc<TableDesc>,
41 pub predicate: Condition,
43 pub chunk_size: Option<u32>,
45 pub table_cardinality: Cardinality,
47 #[educe(PartialEq(ignore))]
48 #[educe(Hash(ignore))]
49 pub ctx: OptimizerContextRef,
50}
51
52impl SysScan {
53 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
54 self.predicate = self.predicate.clone().rewrite_expr(r);
55 }
56
57 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
58 self.predicate.visit_expr(v);
59 }
60
61 pub fn output_column_ids(&self) -> Vec<ColumnId> {
63 self.output_col_idx
64 .iter()
65 .map(|i| self.get_table_columns()[*i].column_id)
66 .collect()
67 }
68
69 pub fn primary_key(&self) -> &[ColumnOrder] {
70 &self.table_desc.pk
71 }
72
73 pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
74 self.output_col_idx
75 .iter()
76 .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
77 .collect()
78 }
79
80 pub(crate) fn column_names(&self) -> Vec<String> {
81 self.output_col_idx
82 .iter()
83 .map(|&i| self.get_table_columns()[i].name.clone())
84 .collect()
85 }
86
87 pub(crate) fn order_names(&self) -> Vec<String> {
88 self.table_desc
89 .order_column_indices()
90 .iter()
91 .map(|&i| self.get_table_columns()[i].name.clone())
92 .collect()
93 }
94
95 pub(crate) fn order_names_with_table_prefix(&self) -> Vec<String> {
96 self.table_desc
97 .order_column_indices()
98 .iter()
99 .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name))
100 .collect()
101 }
102
103 pub fn get_out_column_index_order(&self) -> Order {
106 let id_to_tb_idx = self.table_desc.get_id_to_op_idx_mapping();
107 let order = Order::new(
108 self.table_desc
109 .pk
110 .iter()
111 .map(|order| {
112 let idx = id_to_tb_idx
113 .get(&self.table_desc.columns[order.column_index].column_id)
114 .unwrap();
115 ColumnOrder::new(*idx, order.order_type)
116 })
117 .collect(),
118 );
119 self.i2o_col_mapping().rewrite_provided_order(&order)
120 }
121
122 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
124 ColIndexMapping::with_remaining_columns(
125 &self.output_col_idx,
126 self.get_table_columns().len(),
127 )
128 }
129
130 pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
132 let mut ids = self.output_column_ids();
133 for column_order in self.primary_key() {
134 let id = self.get_table_columns()[column_order.column_index].column_id;
135 if !ids.contains(&id) {
136 ids.push(id);
137 }
138 }
139 ids
140 }
141
142 #[allow(clippy::too_many_arguments)]
144 pub(crate) fn new(
145 table_name: String,
146 output_col_idx: Vec<usize>, table_desc: Rc<TableDesc>,
148 ctx: OptimizerContextRef,
149 predicate: Condition, table_cardinality: Cardinality,
151 ) -> Self {
152 Self::new_inner(
153 table_name,
154 output_col_idx,
155 table_desc,
156 ctx,
157 predicate,
158 table_cardinality,
159 )
160 }
161
162 #[allow(clippy::too_many_arguments)]
163 pub(crate) fn new_inner(
164 table_name: String,
165 output_col_idx: Vec<usize>, table_desc: Rc<TableDesc>,
167 ctx: OptimizerContextRef,
168 predicate: Condition, table_cardinality: Cardinality,
170 ) -> Self {
171 let mut required_col_idx = output_col_idx.clone();
180 let predicate_col_idx = predicate.collect_input_refs(table_desc.columns.len());
181 predicate_col_idx.ones().for_each(|idx| {
182 if !required_col_idx.contains(&idx) {
183 required_col_idx.push(idx);
184 }
185 });
186
187 Self {
188 table_name,
189 required_col_idx,
190 output_col_idx,
191 table_desc,
192 predicate,
193 chunk_size: None,
194 ctx,
195 table_cardinality,
196 }
197 }
198
199 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
200 Pretty::Array(
201 match verbose {
202 true => self.column_names_with_table_prefix(),
203 false => self.column_names(),
204 }
205 .into_iter()
206 .map(Pretty::from)
207 .collect(),
208 )
209 }
210
211 pub(crate) fn fields_pretty_schema(&self) -> Schema {
212 let fields = self
213 .table_desc
214 .columns
215 .iter()
216 .map(|col| Field::from_with_table_name_prefix(col, &self.table_name))
217 .collect();
218 Schema { fields }
219 }
220}
221
222impl GenericPlanNode for SysScan {
223 fn schema(&self) -> Schema {
224 let fields = self
225 .output_col_idx
226 .iter()
227 .map(|tb_idx| {
228 let col = &self.get_table_columns()[*tb_idx];
229 Field::from_with_table_name_prefix(col, &self.table_name)
230 })
231 .collect();
232 Schema { fields }
233 }
234
235 fn stream_key(&self) -> Option<Vec<usize>> {
236 let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_desc);
237 self.table_desc
238 .stream_key
239 .iter()
240 .map(|&c| {
241 id_to_op_idx
242 .get(&self.table_desc.columns[c].column_id)
243 .copied()
244 })
245 .collect::<Option<Vec<_>>>()
246 }
247
248 fn ctx(&self) -> OptimizerContextRef {
249 self.ctx.clone()
250 }
251
252 fn functional_dependency(&self) -> FunctionalDependencySet {
253 let pk_indices = self.stream_key();
254 let col_num = self.output_col_idx.len();
255 match &pk_indices {
256 Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
257 None => FunctionalDependencySet::new(col_num),
258 }
259 }
260}
261
262impl SysScan {
263 pub fn get_table_columns(&self) -> &[ColumnDesc] {
264 &self.table_desc.columns
265 }
266
267 pub fn column_descs(&self) -> Vec<ColumnDesc> {
269 self.output_col_idx
270 .iter()
271 .map(|&i| self.get_table_columns()[i].clone())
272 .collect()
273 }
274
275 pub fn get_id_to_op_idx_mapping(
277 output_col_idx: &[usize],
278 table_desc: &Rc<TableDesc>,
279 ) -> HashMap<ColumnId, usize> {
280 let mut id_to_op_idx = HashMap::new();
281 output_col_idx
282 .iter()
283 .enumerate()
284 .for_each(|(op_idx, tb_idx)| {
285 let col = &table_desc.columns[*tb_idx];
286 id_to_op_idx.insert(col.column_id, op_idx);
287 });
288 id_to_op_idx
289 }
290}