risingwave_frontend/optimizer/plan_node/
batch_delete.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::batch_plan::DeleteNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
22};
23use crate::error::Result;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{ToLocalBatch, utils};
26use crate::optimizer::plan_visitor::DistributedDmlVisitor;
27use crate::optimizer::property::{Distribution, Order, RequiredDist};
28
29/// `BatchDelete` implements [`super::LogicalDelete`]
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchDelete {
32    pub base: PlanBase<Batch>,
33    pub core: generic::Delete<PlanRef>,
34}
35
36impl BatchDelete {
37    pub fn new(core: generic::Delete<PlanRef>) -> Self {
38        let base =
39            PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
40        Self { base, core }
41    }
42}
43
44impl PlanTreeNodeUnary for BatchDelete {
45    fn input(&self) -> PlanRef {
46        self.core.input.clone()
47    }
48
49    fn clone_with_input(&self, input: PlanRef) -> Self {
50        let mut core = self.core.clone();
51        core.input = input;
52        Self::new(core)
53    }
54}
55
56impl_plan_tree_node_for_unary! { BatchDelete }
57impl_distill_by_unit!(BatchDelete, core, "BatchDelete");
58
59impl ToDistributedBatch for BatchDelete {
60    fn to_distributed(&self) -> Result<PlanRef> {
61        if DistributedDmlVisitor::dml_should_run_in_distributed(self.input()) {
62            // Add an hash shuffle between the delete and its input.
63            let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
64                (0..self.input().schema().len()).collect(),
65            ))
66            .enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
67            let new_delete: PlanRef = self.clone_with_input(new_input).into();
68            if self.core.returning {
69                Ok(new_delete)
70            } else {
71                utils::sum_affected_row(new_delete)
72            }
73        } else {
74            let new_input = RequiredDist::single()
75                .enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
76            Ok(self.clone_with_input(new_input).into())
77        }
78    }
79}
80
81impl ToBatchPb for BatchDelete {
82    fn to_batch_prost_body(&self) -> NodeBody {
83        NodeBody::Delete(DeleteNode {
84            table_id: self.core.table_id.table_id(),
85            table_version_id: self.core.table_version_id,
86            returning: self.core.returning,
87            session_id: self.base.ctx().session_ctx().session_id().0 as u32,
88        })
89    }
90}
91
92impl ToLocalBatch for BatchDelete {
93    fn to_local(&self) -> Result<PlanRef> {
94        let new_input = RequiredDist::single()
95            .enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
96        Ok(self.clone_with_input(new_input).into())
97    }
98}
99
100impl ExprRewritable for BatchDelete {}
101
102impl ExprVisitable for BatchDelete {}