risingwave_frontend/optimizer/plan_node/
logical_insert.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::catalog::TableVersionId;
17
18use super::generic::GenericPlanRef;
19use super::utils::{Distill, childless_record};
20use super::{
21 BatchInsert, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
22 PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown, generic,
23};
24use crate::catalog::TableId;
25use crate::error::Result;
26use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::{
29 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
30};
31use crate::utils::{ColIndexMapping, Condition};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct LogicalInsert {
39 pub base: PlanBase<Logical>,
40 core: generic::Insert<PlanRef>,
41}
42
43impl LogicalInsert {
44 pub fn new(core: generic::Insert<PlanRef>) -> Self {
45 let base = PlanBase::new_logical_with_core(&core);
46 Self { base, core }
47 }
48
49 #[must_use]
51 pub fn column_indices(&self) -> Vec<usize> {
52 self.core.column_indices.clone()
53 }
54
55 #[must_use]
56 pub fn default_columns(&self) -> Vec<(usize, ExprImpl)> {
57 self.core.default_columns.clone()
58 }
59
60 #[must_use]
61 pub fn table_id(&self) -> TableId {
62 self.core.table_id
63 }
64
65 #[must_use]
66 pub fn row_id_index(&self) -> Option<usize> {
67 self.core.row_id_index
68 }
69
70 pub fn has_returning(&self) -> bool {
71 self.core.returning
72 }
73
74 pub fn table_version_id(&self) -> TableVersionId {
75 self.core.table_version_id
76 }
77}
78
79impl PlanTreeNodeUnary for LogicalInsert {
80 fn input(&self) -> PlanRef {
81 self.core.input.clone()
82 }
83
84 fn clone_with_input(&self, input: PlanRef) -> Self {
85 let mut core = self.core.clone();
86 core.input = input;
87 Self::new(core)
88 }
89}
90
91impl_plan_tree_node_for_unary! {LogicalInsert}
92
93impl Distill for LogicalInsert {
94 fn distill<'a>(&self) -> XmlNode<'a> {
95 let vec = self
96 .core
97 .fields_pretty(self.base.ctx().is_explain_verbose());
98 childless_record("LogicalInsert", vec)
99 }
100}
101
102impl ColPrunable for LogicalInsert {
103 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
104 let pruned_input = {
105 let input = &self.core.input;
106 let required_cols: Vec<_> = (0..input.schema().len()).collect();
107 input.prune_col(&required_cols, ctx)
108 };
109
110 LogicalProject::with_out_col_idx(
112 self.clone_with_input(pruned_input).into(),
113 required_cols.iter().copied(),
114 )
115 .into()
116 }
117}
118
119impl ExprRewritable for LogicalInsert {
120 fn has_rewritable_expr(&self) -> bool {
121 true
122 }
123
124 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
125 let mut new = self.clone();
126 new.core.default_columns = new
127 .core
128 .default_columns
129 .into_iter()
130 .map(|(c, e)| (c, r.rewrite_expr(e)))
131 .collect();
132 new.into()
133 }
134}
135
136impl ExprVisitable for LogicalInsert {
137 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
138 self.core
139 .default_columns
140 .iter()
141 .for_each(|(_, e)| v.visit_expr(e));
142 }
143}
144
145impl PredicatePushdown for LogicalInsert {
146 fn predicate_pushdown(
147 &self,
148 predicate: Condition,
149 ctx: &mut PredicatePushdownContext,
150 ) -> PlanRef {
151 gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
152 }
153}
154
155impl ToBatch for LogicalInsert {
156 fn to_batch(&self) -> Result<PlanRef> {
157 let new_input = self.input().to_batch()?;
158 let mut core = self.core.clone();
159 core.input = new_input;
160 Ok(BatchInsert::new(core).into())
161 }
162}
163
164impl ToStream for LogicalInsert {
165 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
166 unreachable!("insert should always be converted to batch plan");
167 }
168
169 fn logical_rewrite_for_stream(
170 &self,
171 _ctx: &mut RewriteStreamContext,
172 ) -> Result<(PlanRef, ColIndexMapping)> {
173 unreachable!("insert should always be converted to batch plan");
174 }
175}