risingwave_frontend/optimizer/plan_node/
batch_insert.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::InsertNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr};
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{
23    BatchPlanRef as PlanRef, ExprRewritable, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
24    generic,
25};
26use crate::error::Result;
27use crate::expr::Expr;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::{PlanBase, ToLocalBatch, utils};
30use crate::optimizer::plan_visitor::DistributedDmlVisitor;
31use crate::optimizer::property::{Distribution, Order, RequiredDist};
32
33/// `BatchInsert` implements [`super::LogicalInsert`]
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct BatchInsert {
36    pub base: PlanBase<Batch>,
37    pub core: generic::Insert<PlanRef>,
38}
39
40impl BatchInsert {
41    pub fn new(core: generic::Insert<PlanRef>) -> Self {
42        let base: PlanBase<Batch> =
43            PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
44
45        BatchInsert { base, core }
46    }
47}
48
49impl Distill for BatchInsert {
50    fn distill<'a>(&self) -> XmlNode<'a> {
51        let vec = self
52            .core
53            .fields_pretty(self.base.ctx().is_explain_verbose());
54        childless_record("BatchInsert", vec)
55    }
56}
57
58impl PlanTreeNodeUnary<Batch> for BatchInsert {
59    fn input(&self) -> PlanRef {
60        self.core.input.clone()
61    }
62
63    fn clone_with_input(&self, input: PlanRef) -> Self {
64        let mut core = self.core.clone();
65        core.input = input;
66        Self::new(core)
67    }
68}
69
70impl_plan_tree_node_for_unary! { Batch, BatchInsert }
71
72impl ToDistributedBatch for BatchInsert {
73    fn to_distributed(&self) -> Result<PlanRef> {
74        if DistributedDmlVisitor::dml_should_run_in_distributed(self.input()) {
75            // Add an hash shuffle between the insert and its input.
76            let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
77                (0..self.input().schema().len()).collect(),
78            ))
79            .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
80            let new_insert: PlanRef = self.clone_with_input(new_input).into();
81            if self.core.returning {
82                Ok(new_insert)
83            } else {
84                utils::sum_affected_row(new_insert)
85            }
86        } else {
87            let new_input = RequiredDist::single()
88                .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
89            Ok(self.clone_with_input(new_input).into())
90        }
91    }
92}
93
94impl ToBatchPb for BatchInsert {
95    fn to_batch_prost_body(&self) -> NodeBody {
96        let wait_for_persistence = self.base.ctx().batch_plan_dml_wait_persistence();
97        let column_indices = self.core.column_indices.iter().map(|&i| i as u32).collect();
98
99        let default_columns = &self.core.default_columns;
100        let has_default_columns = !default_columns.is_empty();
101        let default_columns = DefaultColumns {
102            default_columns: default_columns
103                .iter()
104                .map(|(i, expr)| IndexAndExpr {
105                    index: *i as u32,
106                    expr: Some(expr.to_expr_proto()),
107                })
108                .collect(),
109        };
110        NodeBody::Insert(InsertNode {
111            table_id: self.core.table_id,
112            table_version_id: self.core.table_version_id,
113            column_indices,
114            default_columns: if has_default_columns {
115                Some(default_columns)
116            } else {
117                None
118            },
119            row_id_index: self.core.row_id_index.map(|index| index as _),
120            returning: self.core.returning,
121            session_id: self.base.ctx().session_ctx().session_id().0 as u32,
122            wait_for_persistence,
123        })
124    }
125}
126
127impl ToLocalBatch for BatchInsert {
128    fn to_local(&self) -> Result<PlanRef> {
129        let new_input = RequiredDist::single()
130            .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
131        Ok(self.clone_with_input(new_input).into())
132    }
133}
134
135impl ExprRewritable<Batch> for BatchInsert {}
136
137impl ExprVisitable for BatchInsert {}