risingwave_frontend/optimizer/plan_node/generic/
project_set.rs1use pretty_xmlish::{Pretty, Str, XmlNode};
16use risingwave_common::catalog::{Field, PROJECTED_ROW_ID_COLUMN_NAME, Schema};
17use risingwave_common::types::DataType;
18
19use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
20use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor};
21use crate::optimizer::optimizer_context::OptimizerContextRef;
22use crate::optimizer::plan_node::BatchPlanRef;
23use crate::optimizer::plan_node::batch::BatchPlanNodeMetadata;
24use crate::optimizer::plan_node::utils::childless_record;
25use crate::optimizer::property::{FunctionalDependencySet, Order};
26use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct ProjectSet<PlanRef> {
38 pub select_list: Vec<ExprImpl>,
39 pub input: PlanRef,
40}
41
42impl<PlanRef> ProjectSet<PlanRef> {
43 pub(crate) fn clone_with_input<OtherPlanRef>(
44 &self,
45 input: OtherPlanRef,
46 ) -> ProjectSet<OtherPlanRef> {
47 ProjectSet {
48 select_list: self.select_list.clone(),
49 input,
50 }
51 }
52
53 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
54 self.select_list = self
55 .select_list
56 .iter()
57 .map(|e| r.rewrite_expr(e.clone()))
58 .collect();
59 }
60
61 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
62 self.select_list.iter().for_each(|e| v.visit_expr(e));
63 }
64
65 pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
66 (self.select_list, self.input)
67 }
68}
69
70impl<PlanRef> DistillUnit for ProjectSet<PlanRef> {
71 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
72 let fields = vec![("select_list", Pretty::debug(&self.select_list))];
73 childless_record(name, fields)
74 }
75}
76
77impl<PlanRef: GenericPlanRef> GenericPlanNode for ProjectSet<PlanRef> {
78 fn schema(&self) -> Schema {
79 let input_schema = self.input.schema();
80 let o2i = self.o2i_col_mapping();
81 let mut fields = vec![Field::with_name(
82 DataType::Int64,
83 PROJECTED_ROW_ID_COLUMN_NAME,
84 )];
85 fields.extend(self.select_list.iter().enumerate().map(|(idx, expr)| {
86 let idx = idx + 1;
87 let name = match o2i.try_map(idx) {
89 Some(input_idx) => input_schema.fields()[input_idx].name.clone(),
90 None => format!("{:?}", ExprDisplay { expr, input_schema }),
91 };
92 Field::with_name(expr.return_type(), name)
93 }));
94
95 Schema { fields }
96 }
97
98 fn stream_key(&self) -> Option<Vec<usize>> {
99 let i2o = self.i2o_col_mapping();
100 let mut pk = self
101 .input
102 .stream_key()?
103 .iter()
104 .map(|pk_col| i2o.try_map(*pk_col))
105 .collect::<Option<Vec<_>>>()
106 .unwrap_or_default();
107 pk.push(0);
109 Some(pk)
110 }
111
112 fn ctx(&self) -> OptimizerContextRef {
113 self.input.ctx()
114 }
115
116 fn functional_dependency(&self) -> FunctionalDependencySet {
117 let i2o = self.i2o_col_mapping();
118 i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
119 }
120}
121
122impl<PlanRef: GenericPlanRef> ProjectSet<PlanRef> {
123 pub fn o2i_col_mapping(&self) -> ColIndexMapping {
125 let input_len = self.input.schema().len();
126 let mut map = vec![None; 1 + self.select_list.len()];
127 for (i, item) in self.select_list.iter().enumerate() {
128 if let ExprImpl::InputRef(input) = item {
129 map[1 + i] = Some(input.index())
130 }
131 }
132 ColIndexMapping::new(map, input_len)
133 }
134
135 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
138 let input_len = self.input.schema().len();
139 let mut map = vec![None; input_len];
140 for (i, item) in self.select_list.iter().enumerate() {
141 if let ExprImpl::InputRef(input) = item {
142 map[input.index()] = Some(1 + i)
143 }
144 }
145 ColIndexMapping::new(map, 1 + self.select_list.len())
146 }
147}
148
149impl ProjectSet<BatchPlanRef> {
150 pub fn get_out_column_index_order(&self) -> Order {
152 self.i2o_col_mapping()
153 .rewrite_provided_order(self.input.order())
154 }
155}