risingwave_frontend/optimizer/plan_node/
batch_delete.rs1use risingwave_pb::batch_plan::DeleteNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
22 ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{ToLocalBatch, utils};
27use crate::optimizer::plan_visitor::DistributedDmlVisitor;
28use crate::optimizer::property::{Distribution, Order, RequiredDist};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchDelete {
33 pub base: PlanBase<Batch>,
34 pub core: generic::Delete<PlanRef>,
35}
36
37impl BatchDelete {
38 pub fn new(core: generic::Delete<PlanRef>) -> Self {
39 let base =
40 PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
41 Self { base, core }
42 }
43}
44
45impl PlanTreeNodeUnary<Batch> for BatchDelete {
46 fn input(&self) -> PlanRef {
47 self.core.input.clone()
48 }
49
50 fn clone_with_input(&self, input: PlanRef) -> Self {
51 let mut core = self.core.clone();
52 core.input = input;
53 Self::new(core)
54 }
55}
56
57impl_plan_tree_node_for_unary! { Batch, BatchDelete }
58impl_distill_by_unit!(BatchDelete, core, "BatchDelete");
59
60impl ToDistributedBatch for BatchDelete {
61 fn to_distributed(&self) -> Result<PlanRef> {
62 if DistributedDmlVisitor::dml_should_run_in_distributed(self.input()) {
63 let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
65 (0..self.input().schema().len()).collect(),
66 ))
67 .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
68 let new_delete: PlanRef = self.clone_with_input(new_input).into();
69 if self.core.returning {
70 Ok(new_delete)
71 } else {
72 utils::sum_affected_row(new_delete)
73 }
74 } else {
75 let new_input = RequiredDist::single()
76 .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
77 Ok(self.clone_with_input(new_input).into())
78 }
79 }
80}
81
82impl ToBatchPb for BatchDelete {
83 fn to_batch_prost_body(&self) -> NodeBody {
84 let wait_for_persistence = self.base.ctx().batch_plan_dml_wait_persistence();
85 NodeBody::Delete(DeleteNode {
86 table_id: self.core.table_id,
87 table_version_id: self.core.table_version_id,
88 pk_indices: self.core.pk_indices.iter().map(|&idx| idx as u32).collect(),
89 returning: self.core.returning,
90 upsert: self.base.ctx().session_ctx().config().upsert_dml(),
91 session_id: self.base.ctx().session_ctx().session_id().0 as u32,
92 wait_for_persistence,
93 })
94 }
95}
96
97impl ToLocalBatch for BatchDelete {
98 fn to_local(&self) -> Result<PlanRef> {
99 let new_input = RequiredDist::single()
100 .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
101 Ok(self.clone_with_input(new_input).into())
102 }
103}
104
105impl ExprRewritable<Batch> for BatchDelete {}
106
107impl ExprVisitable for BatchDelete {}