risingwave_frontend/optimizer/plan_node/
batch_insert.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::InsertNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr};
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic};
23use crate::error::Result;
24use crate::expr::Expr;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{PlanBase, ToLocalBatch, utils};
27use crate::optimizer::plan_visitor::DistributedDmlVisitor;
28use crate::optimizer::property::{Distribution, Order, RequiredDist};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchInsert {
33 pub base: PlanBase<Batch>,
34 pub core: generic::Insert<PlanRef>,
35}
36
37impl BatchInsert {
38 pub fn new(core: generic::Insert<PlanRef>) -> Self {
39 let base: PlanBase<Batch> =
40 PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
41
42 BatchInsert { base, core }
43 }
44}
45
46impl Distill for BatchInsert {
47 fn distill<'a>(&self) -> XmlNode<'a> {
48 let vec = self
49 .core
50 .fields_pretty(self.base.ctx().is_explain_verbose());
51 childless_record("BatchInsert", vec)
52 }
53}
54
55impl PlanTreeNodeUnary for BatchInsert {
56 fn input(&self) -> PlanRef {
57 self.core.input.clone()
58 }
59
60 fn clone_with_input(&self, input: PlanRef) -> Self {
61 let mut core = self.core.clone();
62 core.input = input;
63 Self::new(core)
64 }
65}
66
67impl_plan_tree_node_for_unary! { BatchInsert }
68
69impl ToDistributedBatch for BatchInsert {
70 fn to_distributed(&self) -> Result<PlanRef> {
71 if DistributedDmlVisitor::dml_should_run_in_distributed(self.input()) {
72 let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
74 (0..self.input().schema().len()).collect(),
75 ))
76 .enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
77 let new_insert: PlanRef = self.clone_with_input(new_input).into();
78 if self.core.returning {
79 Ok(new_insert)
80 } else {
81 utils::sum_affected_row(new_insert)
82 }
83 } else {
84 let new_input = RequiredDist::single()
85 .enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
86 Ok(self.clone_with_input(new_input).into())
87 }
88 }
89}
90
91impl ToBatchPb for BatchInsert {
92 fn to_batch_prost_body(&self) -> NodeBody {
93 let column_indices = self.core.column_indices.iter().map(|&i| i as u32).collect();
94
95 let default_columns = &self.core.default_columns;
96 let has_default_columns = !default_columns.is_empty();
97 let default_columns = DefaultColumns {
98 default_columns: default_columns
99 .iter()
100 .map(|(i, expr)| IndexAndExpr {
101 index: *i as u32,
102 expr: Some(expr.to_expr_proto()),
103 })
104 .collect(),
105 };
106 NodeBody::Insert(InsertNode {
107 table_id: self.core.table_id.table_id(),
108 table_version_id: self.core.table_version_id,
109 column_indices,
110 default_columns: if has_default_columns {
111 Some(default_columns)
112 } else {
113 None
114 },
115 row_id_index: self.core.row_id_index.map(|index| index as _),
116 returning: self.core.returning,
117 session_id: self.base.ctx().session_ctx().session_id().0 as u32,
118 })
119 }
120}
121
122impl ToLocalBatch for BatchInsert {
123 fn to_local(&self) -> Result<PlanRef> {
124 let new_input = RequiredDist::single()
125 .enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
126 Ok(self.clone_with_input(new_input).into())
127 }
128}
129
130impl ExprRewritable for BatchInsert {}
131
132impl ExprVisitable for BatchInsert {}