risingwave_frontend/optimizer/plan_node/
batch_insert.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::InsertNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr};
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{
23 BatchPlanRef as PlanRef, ExprRewritable, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
24 generic,
25};
26use crate::error::Result;
27use crate::expr::Expr;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::{PlanBase, ToLocalBatch, utils};
30use crate::optimizer::plan_visitor::DistributedDmlVisitor;
31use crate::optimizer::property::{Distribution, Order, RequiredDist};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct BatchInsert {
36 pub base: PlanBase<Batch>,
37 pub core: generic::Insert<PlanRef>,
38}
39
40impl BatchInsert {
41 pub fn new(core: generic::Insert<PlanRef>) -> Self {
42 let base: PlanBase<Batch> =
43 PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
44
45 BatchInsert { base, core }
46 }
47}
48
49impl Distill for BatchInsert {
50 fn distill<'a>(&self) -> XmlNode<'a> {
51 let vec = self
52 .core
53 .fields_pretty(self.base.ctx().is_explain_verbose());
54 childless_record("BatchInsert", vec)
55 }
56}
57
58impl PlanTreeNodeUnary<Batch> for BatchInsert {
59 fn input(&self) -> PlanRef {
60 self.core.input.clone()
61 }
62
63 fn clone_with_input(&self, input: PlanRef) -> Self {
64 let mut core = self.core.clone();
65 core.input = input;
66 Self::new(core)
67 }
68}
69
70impl_plan_tree_node_for_unary! { Batch, BatchInsert }
71
72impl ToDistributedBatch for BatchInsert {
73 fn to_distributed(&self) -> Result<PlanRef> {
74 if DistributedDmlVisitor::dml_should_run_in_distributed(self.input()) {
75 let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
77 (0..self.input().schema().len()).collect(),
78 ))
79 .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
80 let new_insert: PlanRef = self.clone_with_input(new_input).into();
81 if self.core.returning {
82 Ok(new_insert)
83 } else {
84 utils::sum_affected_row(new_insert)
85 }
86 } else {
87 let new_input = RequiredDist::single()
88 .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
89 Ok(self.clone_with_input(new_input).into())
90 }
91 }
92}
93
94impl ToBatchPb for BatchInsert {
95 fn to_batch_prost_body(&self) -> NodeBody {
96 let column_indices = self.core.column_indices.iter().map(|&i| i as u32).collect();
97
98 let default_columns = &self.core.default_columns;
99 let has_default_columns = !default_columns.is_empty();
100 let default_columns = DefaultColumns {
101 default_columns: default_columns
102 .iter()
103 .map(|(i, expr)| IndexAndExpr {
104 index: *i as u32,
105 expr: Some(expr.to_expr_proto()),
106 })
107 .collect(),
108 };
109 NodeBody::Insert(InsertNode {
110 table_id: self.core.table_id.table_id(),
111 table_version_id: self.core.table_version_id,
112 column_indices,
113 default_columns: if has_default_columns {
114 Some(default_columns)
115 } else {
116 None
117 },
118 row_id_index: self.core.row_id_index.map(|index| index as _),
119 returning: self.core.returning,
120 session_id: self.base.ctx().session_ctx().session_id().0 as u32,
121 })
122 }
123}
124
125impl ToLocalBatch for BatchInsert {
126 fn to_local(&self) -> Result<PlanRef> {
127 let new_input = RequiredDist::single()
128 .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
129 Ok(self.clone_with_input(new_input).into())
130 }
131}
132
133impl ExprRewritable<Batch> for BatchInsert {}
134
135impl ExprVisitable for BatchInsert {}