risingwave_frontend/optimizer/plan_node/generic/
update.rs1use std::hash::Hash;
16
17use educe::Educe;
18use pretty_xmlish::{Pretty, Str, XmlNode};
19use risingwave_common::catalog::{Field, Schema, TableVersionId};
20use risingwave_common::types::DataType;
21
22use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
23use crate::OptimizerContextRef;
24use crate::catalog::TableId;
25use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::utils::childless_record;
27use crate::optimizer::property::FunctionalDependencySet;
28
29#[derive(Debug, Clone, Educe)]
30#[educe(PartialEq, Eq, Hash)]
31pub struct Update<PlanRef: Eq + Hash> {
32 #[educe(PartialEq(ignore))]
33 #[educe(Hash(ignore))]
34 pub table_name: String, pub table_id: TableId,
36 pub table_version_id: TableVersionId,
37 pub input: PlanRef,
38 pub old_exprs: Vec<ExprImpl>,
39 pub new_exprs: Vec<ExprImpl>,
40 pub returning: bool,
41}
42
43impl<PlanRef: GenericPlanRef> Update<PlanRef> {
44 pub fn output_len(&self) -> usize {
45 if self.returning {
46 self.new_exprs.len()
47 } else {
48 1
49 }
50 }
51}
52impl<PlanRef: GenericPlanRef> GenericPlanNode for Update<PlanRef> {
53 fn functional_dependency(&self) -> FunctionalDependencySet {
54 FunctionalDependencySet::new(self.output_len())
55 }
56
57 fn schema(&self) -> Schema {
58 if self.returning {
59 Schema::new(
60 self.new_exprs
61 .iter()
62 .map(|e| Field::unnamed(e.return_type()))
63 .collect(),
64 )
65 } else {
66 Schema::new(vec![Field::unnamed(DataType::Int64)])
67 }
68 }
69
70 fn stream_key(&self) -> Option<Vec<usize>> {
71 None
72 }
73
74 fn ctx(&self) -> OptimizerContextRef {
75 self.input.ctx()
76 }
77}
78
79impl<PlanRef: Eq + Hash> Update<PlanRef> {
80 pub fn new(
81 input: PlanRef,
82 table_name: String,
83 table_id: TableId,
84 table_version_id: TableVersionId,
85 old_exprs: Vec<ExprImpl>,
86 new_exprs: Vec<ExprImpl>,
87 returning: bool,
88 ) -> Self {
89 Self {
90 table_name,
91 table_id,
92 table_version_id,
93 input,
94 old_exprs,
95 new_exprs,
96 returning,
97 }
98 }
99
100 pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
101 for exprs in [&mut self.old_exprs, &mut self.new_exprs] {
102 *exprs = exprs.iter().map(|e| r.rewrite_expr(e.clone())).collect();
103 }
104 }
105
106 pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
107 for exprs in [&self.old_exprs, &self.new_exprs] {
108 exprs.iter().for_each(|e| v.visit_expr(e));
109 }
110 }
111}
112
113impl<PlanRef: Eq + Hash> DistillUnit for Update<PlanRef> {
114 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
115 let mut vec = Vec::with_capacity(if self.returning { 3 } else { 2 });
116 vec.push(("table", Pretty::from(self.table_name.clone())));
117 vec.push(("exprs", Pretty::debug(&self.new_exprs)));
118 if self.returning {
119 vec.push(("returning", Pretty::display(&true)));
120 }
121 childless_record(name, vec)
122 }
123}