risingwave_frontend/optimizer/plan_node/
batch_delete.rs1use risingwave_pb::batch_plan::DeleteNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
22 ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{ToLocalBatch, utils};
27use crate::optimizer::plan_visitor::DistributedDmlVisitor;
28use crate::optimizer::property::{Distribution, Order, RequiredDist};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchDelete {
33 pub base: PlanBase<Batch>,
34 pub core: generic::Delete<PlanRef>,
35}
36
37impl BatchDelete {
38 pub fn new(core: generic::Delete<PlanRef>) -> Self {
39 let base =
40 PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
41 Self { base, core }
42 }
43}
44
45impl PlanTreeNodeUnary<Batch> for BatchDelete {
46 fn input(&self) -> PlanRef {
47 self.core.input.clone()
48 }
49
50 fn clone_with_input(&self, input: PlanRef) -> Self {
51 let mut core = self.core.clone();
52 core.input = input;
53 Self::new(core)
54 }
55}
56
57impl_plan_tree_node_for_unary! { Batch, BatchDelete }
58impl_distill_by_unit!(BatchDelete, core, "BatchDelete");
59
60impl ToDistributedBatch for BatchDelete {
61 fn to_distributed(&self) -> Result<PlanRef> {
62 if DistributedDmlVisitor::dml_should_run_in_distributed(self.input()) {
63 let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
65 (0..self.input().schema().len()).collect(),
66 ))
67 .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
68 let new_delete: PlanRef = self.clone_with_input(new_input).into();
69 if self.core.returning {
70 Ok(new_delete)
71 } else {
72 utils::sum_affected_row(new_delete)
73 }
74 } else {
75 let new_input = RequiredDist::single()
76 .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
77 Ok(self.clone_with_input(new_input).into())
78 }
79 }
80}
81
82impl ToBatchPb for BatchDelete {
83 fn to_batch_prost_body(&self) -> NodeBody {
84 NodeBody::Delete(DeleteNode {
85 table_id: self.core.table_id.table_id(),
86 table_version_id: self.core.table_version_id,
87 returning: self.core.returning,
88 session_id: self.base.ctx().session_ctx().session_id().0 as u32,
89 })
90 }
91}
92
93impl ToLocalBatch for BatchDelete {
94 fn to_local(&self) -> Result<PlanRef> {
95 let new_input = RequiredDist::single()
96 .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
97 Ok(self.clone_with_input(new_input).into())
98 }
99}
100
101impl ExprRewritable<Batch> for BatchDelete {}
102
103impl ExprVisitable for BatchDelete {}