risingwave_frontend/optimizer/plan_node/
logical_insert.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::catalog::TableVersionId;
17
18use super::generic::GenericPlanRef;
19use super::utils::{Distill, childless_record};
20use super::{
21 BatchInsert, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject,
22 PlanBase, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown,
23 generic,
24};
25use crate::catalog::TableId;
26use crate::error::Result;
27use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::{
30 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
31};
32use crate::utils::{ColIndexMapping, Condition};
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct LogicalInsert {
40 pub base: PlanBase<Logical>,
41 core: generic::Insert<PlanRef>,
42}
43
44impl LogicalInsert {
45 pub fn new(core: generic::Insert<PlanRef>) -> Self {
46 let base = PlanBase::new_logical_with_core(&core);
47 Self { base, core }
48 }
49
50 #[must_use]
52 pub fn column_indices(&self) -> Vec<usize> {
53 self.core.column_indices.clone()
54 }
55
56 #[must_use]
57 pub fn default_columns(&self) -> Vec<(usize, ExprImpl)> {
58 self.core.default_columns.clone()
59 }
60
61 #[must_use]
62 pub fn table_id(&self) -> TableId {
63 self.core.table_id
64 }
65
66 #[must_use]
67 pub fn row_id_index(&self) -> Option<usize> {
68 self.core.row_id_index
69 }
70
71 pub fn has_returning(&self) -> bool {
72 self.core.returning
73 }
74
75 pub fn table_version_id(&self) -> TableVersionId {
76 self.core.table_version_id
77 }
78}
79
80impl PlanTreeNodeUnary<Logical> for LogicalInsert {
81 fn input(&self) -> PlanRef {
82 self.core.input.clone()
83 }
84
85 fn clone_with_input(&self, input: PlanRef) -> Self {
86 let mut core = self.core.clone();
87 core.input = input;
88 Self::new(core)
89 }
90}
91
92impl_plan_tree_node_for_unary! { Logical, LogicalInsert}
93
94impl Distill for LogicalInsert {
95 fn distill<'a>(&self) -> XmlNode<'a> {
96 let vec = self
97 .core
98 .fields_pretty(self.base.ctx().is_explain_verbose());
99 childless_record("LogicalInsert", vec)
100 }
101}
102
103impl ColPrunable for LogicalInsert {
104 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
105 let pruned_input = {
106 let input = &self.core.input;
107 let required_cols: Vec<_> = (0..input.schema().len()).collect();
108 input.prune_col(&required_cols, ctx)
109 };
110
111 LogicalProject::with_out_col_idx(
113 self.clone_with_input(pruned_input).into(),
114 required_cols.iter().copied(),
115 )
116 .into()
117 }
118}
119
120impl ExprRewritable<Logical> for LogicalInsert {
121 fn has_rewritable_expr(&self) -> bool {
122 true
123 }
124
125 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
126 let mut new = self.clone();
127 new.core.default_columns = new
128 .core
129 .default_columns
130 .into_iter()
131 .map(|(c, e)| (c, r.rewrite_expr(e)))
132 .collect();
133 new.into()
134 }
135}
136
137impl ExprVisitable for LogicalInsert {
138 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
139 self.core
140 .default_columns
141 .iter()
142 .for_each(|(_, e)| v.visit_expr(e));
143 }
144}
145
146impl PredicatePushdown for LogicalInsert {
147 fn predicate_pushdown(
148 &self,
149 predicate: Condition,
150 ctx: &mut PredicatePushdownContext,
151 ) -> PlanRef {
152 gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
153 }
154}
155
156impl ToBatch for LogicalInsert {
157 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
158 let new_input = self.input().to_batch()?;
159 let core = self.core.clone_with_input(new_input);
160 Ok(BatchInsert::new(core).into())
161 }
162}
163
164impl ToStream for LogicalInsert {
165 fn to_stream(
166 &self,
167 _ctx: &mut ToStreamContext,
168 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
169 unreachable!("insert should always be converted to batch plan");
170 }
171
172 fn logical_rewrite_for_stream(
173 &self,
174 _ctx: &mut RewriteStreamContext,
175 ) -> Result<(PlanRef, ColIndexMapping)> {
176 unreachable!("insert should always be converted to batch plan");
177 }
178}