risingwave_frontend/optimizer/plan_node/generic/
sys_scan.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use educe::Educe;
19use pretty_xmlish::Pretty;
20use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
21use risingwave_common::util::column_index_mapping::ColIndexMapping;
22
23use super::GenericPlanNode;
24use crate::catalog::ColumnId;
25use crate::catalog::system_catalog::SystemTableCatalog;
26use crate::optimizer::optimizer_context::OptimizerContextRef;
27use crate::optimizer::property::{Cardinality, FunctionalDependencySet};
28
29#[derive(Debug, Clone, Educe)]
31#[educe(PartialEq, Eq, Hash)]
32pub struct SysScan {
33 pub output_col_idx: Vec<usize>,
34 pub table: Arc<SystemTableCatalog>,
35 pub chunk_size: Option<u32>,
37 pub table_cardinality: Cardinality,
39 #[educe(PartialEq(ignore))]
40 #[educe(Hash(ignore))]
41 pub ctx: OptimizerContextRef,
42}
43
44impl SysScan {
45 pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
46 self.output_col_idx
47 .iter()
48 .map(|&i| format!("{}.{}", self.table.name, self.get_table_columns()[i].name))
49 .collect()
50 }
51
52 pub(crate) fn column_names(&self) -> Vec<String> {
53 self.output_col_idx
54 .iter()
55 .map(|&i| self.get_table_columns()[i].name.clone())
56 .collect()
57 }
58
59 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
61 ColIndexMapping::with_remaining_columns(
62 &self.output_col_idx,
63 self.get_table_columns().len(),
64 )
65 }
66
67 #[allow(clippy::too_many_arguments)]
69 pub(crate) fn new(
70 output_col_idx: Vec<usize>, table: Arc<SystemTableCatalog>,
72 ctx: OptimizerContextRef,
73 table_cardinality: Cardinality,
74 ) -> Self {
75 Self::new_inner(output_col_idx, table, ctx, table_cardinality)
76 }
77
78 #[allow(clippy::too_many_arguments)]
79 pub(crate) fn new_inner(
80 output_col_idx: Vec<usize>, table: Arc<SystemTableCatalog>,
82 ctx: OptimizerContextRef,
83 table_cardinality: Cardinality,
84 ) -> Self {
85 Self {
86 output_col_idx,
87 table,
88 chunk_size: None,
89 ctx,
90 table_cardinality,
91 }
92 }
93
94 pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> {
95 Pretty::Array(
96 match verbose {
97 true => self.column_names_with_table_prefix(),
98 false => self.column_names(),
99 }
100 .into_iter()
101 .map(Pretty::from)
102 .collect(),
103 )
104 }
105}
106
107impl GenericPlanNode for SysScan {
108 fn schema(&self) -> Schema {
109 let fields = self
110 .output_col_idx
111 .iter()
112 .map(|tb_idx| {
113 let col = &self.get_table_columns()[*tb_idx];
114 Field::from_with_table_name_prefix(col, &self.table.name)
115 })
116 .collect();
117 Schema { fields }
118 }
119
120 fn stream_key(&self) -> Option<Vec<usize>> {
121 let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table);
122 self.table
123 .pk
124 .iter()
125 .map(|&c| id_to_op_idx.get(&self.table.columns[c].column_id).copied())
126 .collect::<Option<Vec<_>>>()
127 }
128
129 fn ctx(&self) -> OptimizerContextRef {
130 self.ctx.clone()
131 }
132
133 fn functional_dependency(&self) -> FunctionalDependencySet {
134 let pk_indices = self.stream_key();
135 let col_num = self.output_col_idx.len();
136 match &pk_indices {
137 Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
138 None => FunctionalDependencySet::new(col_num),
139 }
140 }
141}
142
143impl SysScan {
144 pub fn get_table_columns(&self) -> &[ColumnCatalog] {
145 &self.table.columns
146 }
147
148 pub fn column_descs(&self) -> Vec<ColumnDesc> {
150 self.output_col_idx
151 .iter()
152 .map(|&i| self.get_table_columns()[i].column_desc.clone())
153 .collect()
154 }
155
156 pub fn get_id_to_op_idx_mapping(
158 output_col_idx: &[usize],
159 table: &SystemTableCatalog,
160 ) -> HashMap<ColumnId, usize> {
161 ColumnDesc::get_id_to_op_idx_mapping(&table.columns, Some(output_col_idx))
162 }
163}