risingwave_frontend/optimizer/plan_node/
batch_delete.rs1use risingwave_pb::batch_plan::DeleteNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
22};
23use crate::error::Result;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{ToLocalBatch, utils};
26use crate::optimizer::plan_visitor::DistributedDmlVisitor;
27use crate::optimizer::property::{Distribution, Order, RequiredDist};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchDelete {
32 pub base: PlanBase<Batch>,
33 pub core: generic::Delete<PlanRef>,
34}
35
36impl BatchDelete {
37 pub fn new(core: generic::Delete<PlanRef>) -> Self {
38 let base =
39 PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
40 Self { base, core }
41 }
42}
43
44impl PlanTreeNodeUnary for BatchDelete {
45 fn input(&self) -> PlanRef {
46 self.core.input.clone()
47 }
48
49 fn clone_with_input(&self, input: PlanRef) -> Self {
50 let mut core = self.core.clone();
51 core.input = input;
52 Self::new(core)
53 }
54}
55
56impl_plan_tree_node_for_unary! { BatchDelete }
57impl_distill_by_unit!(BatchDelete, core, "BatchDelete");
58
59impl ToDistributedBatch for BatchDelete {
60 fn to_distributed(&self) -> Result<PlanRef> {
61 if DistributedDmlVisitor::dml_should_run_in_distributed(self.input()) {
62 let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
64 (0..self.input().schema().len()).collect(),
65 ))
66 .enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
67 let new_delete: PlanRef = self.clone_with_input(new_input).into();
68 if self.core.returning {
69 Ok(new_delete)
70 } else {
71 utils::sum_affected_row(new_delete)
72 }
73 } else {
74 let new_input = RequiredDist::single()
75 .enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
76 Ok(self.clone_with_input(new_input).into())
77 }
78 }
79}
80
81impl ToBatchPb for BatchDelete {
82 fn to_batch_prost_body(&self) -> NodeBody {
83 NodeBody::Delete(DeleteNode {
84 table_id: self.core.table_id.table_id(),
85 table_version_id: self.core.table_version_id,
86 returning: self.core.returning,
87 session_id: self.base.ctx().session_ctx().session_id().0 as u32,
88 })
89 }
90}
91
92impl ToLocalBatch for BatchDelete {
93 fn to_local(&self) -> Result<PlanRef> {
94 let new_input = RequiredDist::single()
95 .enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
96 Ok(self.clone_with_input(new_input).into())
97 }
98}
99
100impl ExprRewritable for BatchDelete {}
101
102impl ExprVisitable for BatchDelete {}