risingwave_frontend/optimizer/plan_node/
batch_update.rs1use risingwave_common::catalog::Schema;
16use risingwave_pb::batch_plan::UpdateNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{
22 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
23 ToDistributedBatch, generic,
24};
25use crate::error::Result;
26use crate::expr::{Expr, ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::{ToLocalBatch, utils};
29use crate::optimizer::plan_visitor::DistributedDmlVisitor;
30use crate::optimizer::property::{Distribution, Order, RequiredDist};
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchUpdate {
35 pub base: PlanBase<Batch>,
36 pub core: generic::Update<PlanRef>,
37}
38
39impl BatchUpdate {
40 pub fn new(core: generic::Update<PlanRef>, schema: Schema) -> Self {
41 let ctx = core.input.ctx();
42 let base =
43 PlanBase::new_batch(ctx, schema, core.input.distribution().clone(), Order::any());
44 Self { base, core }
45 }
46}
47
48impl PlanTreeNodeUnary<Batch> for BatchUpdate {
49 fn input(&self) -> PlanRef {
50 self.core.input.clone()
51 }
52
53 fn clone_with_input(&self, input: PlanRef) -> Self {
54 let mut core = self.core.clone();
55 core.input = input;
56 Self::new(core, self.schema().clone())
57 }
58}
59
60impl_plan_tree_node_for_unary! { Batch, BatchUpdate }
61impl_distill_by_unit!(BatchUpdate, core, "BatchUpdate");
62
63impl ToDistributedBatch for BatchUpdate {
64 fn to_distributed(&self) -> Result<PlanRef> {
65 if DistributedDmlVisitor::dml_should_run_in_distributed(self.input()) {
66 let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
68 (0..self.input().schema().len()).collect(),
69 ))
70 .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
71 let new_update = self.clone_with_input(new_input).into();
72 if self.core.returning {
73 Ok(new_update)
74 } else {
75 utils::sum_affected_row(new_update)
76 }
77 } else {
78 let new_input = RequiredDist::single()
79 .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
80 Ok(self.clone_with_input(new_input).into())
81 }
82 }
83}
84
85impl ToBatchPb for BatchUpdate {
86 fn to_batch_prost_body(&self) -> NodeBody {
87 let wait_for_persistence = self.base.ctx().batch_plan_dml_wait_persistence();
88 let old_exprs = (self.core.old_exprs)
89 .iter()
90 .map(|x| x.to_expr_proto())
91 .collect();
92 let new_exprs = (self.core.new_exprs)
93 .iter()
94 .map(|x| x.to_expr_proto())
95 .collect();
96
97 NodeBody::Update(UpdateNode {
98 table_id: self.core.table_id,
99 table_version_id: self.core.table_version_id,
100 returning: self.core.returning,
101 old_exprs,
102 new_exprs,
103 upsert: self.base.ctx().session_ctx().config().upsert_dml(),
104 session_id: self.base.ctx().session_ctx().session_id().0 as u32,
105 wait_for_persistence,
106 })
107 }
108}
109
110impl ToLocalBatch for BatchUpdate {
111 fn to_local(&self) -> Result<PlanRef> {
112 let new_input = RequiredDist::single()
113 .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
114 Ok(self.clone_with_input(new_input).into())
115 }
116}
117
118impl ExprRewritable<Batch> for BatchUpdate {
119 fn has_rewritable_expr(&self) -> bool {
120 true
121 }
122
123 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
124 let mut core = self.core.clone();
125 core.rewrite_exprs(r);
126 Self::new(core, self.schema().clone()).into()
127 }
128}
129
130impl ExprVisitable for BatchUpdate {
131 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
132 self.core.visit_exprs(v);
133 }
134}