risingwave_frontend/optimizer/plan_node/
batch_delete.rs1use risingwave_pb::batch_plan::DeleteNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
22    ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{ToLocalBatch, utils};
27use crate::optimizer::plan_visitor::DistributedDmlVisitor;
28use crate::optimizer::property::{Distribution, Order, RequiredDist};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchDelete {
33    pub base: PlanBase<Batch>,
34    pub core: generic::Delete<PlanRef>,
35}
36
37impl BatchDelete {
38    pub fn new(core: generic::Delete<PlanRef>) -> Self {
39        let base =
40            PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
41        Self { base, core }
42    }
43}
44
45impl PlanTreeNodeUnary<Batch> for BatchDelete {
46    fn input(&self) -> PlanRef {
47        self.core.input.clone()
48    }
49
50    fn clone_with_input(&self, input: PlanRef) -> Self {
51        let mut core = self.core.clone();
52        core.input = input;
53        Self::new(core)
54    }
55}
56
57impl_plan_tree_node_for_unary! { Batch, BatchDelete }
58impl_distill_by_unit!(BatchDelete, core, "BatchDelete");
59
60impl ToDistributedBatch for BatchDelete {
61    fn to_distributed(&self) -> Result<PlanRef> {
62        if DistributedDmlVisitor::dml_should_run_in_distributed(self.input()) {
63            let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
65                (0..self.input().schema().len()).collect(),
66            ))
67            .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
68            let new_delete: PlanRef = self.clone_with_input(new_input).into();
69            if self.core.returning {
70                Ok(new_delete)
71            } else {
72                utils::sum_affected_row(new_delete)
73            }
74        } else {
75            let new_input = RequiredDist::single()
76                .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
77            Ok(self.clone_with_input(new_input).into())
78        }
79    }
80}
81
82impl ToBatchPb for BatchDelete {
83    fn to_batch_prost_body(&self) -> NodeBody {
84        NodeBody::Delete(DeleteNode {
85            table_id: self.core.table_id.table_id(),
86            table_version_id: self.core.table_version_id,
87            returning: self.core.returning,
88            session_id: self.base.ctx().session_ctx().session_id().0 as u32,
89        })
90    }
91}
92
93impl ToLocalBatch for BatchDelete {
94    fn to_local(&self) -> Result<PlanRef> {
95        let new_input = RequiredDist::single()
96            .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
97        Ok(self.clone_with_input(new_input).into())
98    }
99}
100
101impl ExprRewritable<Batch> for BatchDelete {}
102
103impl ExprVisitable for BatchDelete {}