risingwave_frontend/optimizer/plan_node/generic/
project_set.rs1use pretty_xmlish::{Pretty, Str, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18
19use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
20use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor};
21use crate::optimizer::optimizer_context::OptimizerContextRef;
22use crate::optimizer::plan_node::batch::BatchPlanRef;
23use crate::optimizer::plan_node::utils::childless_record;
24use crate::optimizer::property::{FunctionalDependencySet, Order};
25use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct ProjectSet<PlanRef> {
37 pub select_list: Vec<ExprImpl>,
38 pub input: PlanRef,
39}
40
41impl<PlanRef> ProjectSet<PlanRef> {
42 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
43 self.select_list = self
44 .select_list
45 .iter()
46 .map(|e| r.rewrite_expr(e.clone()))
47 .collect();
48 }
49
50 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
51 self.select_list.iter().for_each(|e| v.visit_expr(e));
52 }
53
54 pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
55 (self.select_list, self.input)
56 }
57}
58
59impl<PlanRef> DistillUnit for ProjectSet<PlanRef> {
60 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
61 let fields = vec![("select_list", Pretty::debug(&self.select_list))];
62 childless_record(name, fields)
63 }
64}
65
66impl<PlanRef: GenericPlanRef> GenericPlanNode for ProjectSet<PlanRef> {
67 fn schema(&self) -> Schema {
68 let input_schema = self.input.schema();
69 let o2i = self.o2i_col_mapping();
70 let mut fields = vec![Field::with_name(DataType::Int64, "projected_row_id")];
71 fields.extend(self.select_list.iter().enumerate().map(|(idx, expr)| {
72 let idx = idx + 1;
73 let name = match o2i.try_map(idx) {
75 Some(input_idx) => input_schema.fields()[input_idx].name.clone(),
76 None => format!("{:?}", ExprDisplay { expr, input_schema }),
77 };
78 Field::with_name(expr.return_type(), name)
79 }));
80
81 Schema { fields }
82 }
83
84 fn stream_key(&self) -> Option<Vec<usize>> {
85 let i2o = self.i2o_col_mapping();
86 let mut pk = self
87 .input
88 .stream_key()?
89 .iter()
90 .map(|pk_col| i2o.try_map(*pk_col))
91 .collect::<Option<Vec<_>>>()
92 .unwrap_or_default();
93 pk.push(0);
95 Some(pk)
96 }
97
98 fn ctx(&self) -> OptimizerContextRef {
99 self.input.ctx()
100 }
101
102 fn functional_dependency(&self) -> FunctionalDependencySet {
103 let i2o = self.i2o_col_mapping();
104 i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
105 }
106}
107
108impl<PlanRef: GenericPlanRef> ProjectSet<PlanRef> {
109 pub fn o2i_col_mapping(&self) -> ColIndexMapping {
111 let input_len = self.input.schema().len();
112 let mut map = vec![None; 1 + self.select_list.len()];
113 for (i, item) in self.select_list.iter().enumerate() {
114 if let ExprImpl::InputRef(input) = item {
115 map[1 + i] = Some(input.index())
116 }
117 }
118 ColIndexMapping::new(map, input_len)
119 }
120
121 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
124 let input_len = self.input.schema().len();
125 let mut map = vec![None; input_len];
126 for (i, item) in self.select_list.iter().enumerate() {
127 if let ExprImpl::InputRef(input) = item {
128 map[input.index()] = Some(1 + i)
129 }
130 }
131 ColIndexMapping::new(map, 1 + self.select_list.len())
132 }
133}
134
135impl<PlanRef: BatchPlanRef> ProjectSet<PlanRef> {
136 pub fn get_out_column_index_order(&self) -> Order {
138 self.i2o_col_mapping()
139 .rewrite_provided_order(self.input.order())
140 }
141}