risingwave_frontend/optimizer/plan_node/
logical_update.rs1use super::generic::GenericPlanRef;
16use super::utils::impl_distill_by_unit;
17use super::{
18 BatchUpdate, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
19 PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown, generic,
20};
21use crate::error::Result;
22use crate::expr::{ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::{
25 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
26};
27use crate::utils::{ColIndexMapping, Condition};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalUpdate {
35 pub base: PlanBase<Logical>,
36 core: generic::Update<PlanRef>,
37}
38
39impl From<generic::Update<PlanRef>> for LogicalUpdate {
40 fn from(core: generic::Update<PlanRef>) -> Self {
41 let base = PlanBase::new_logical_with_core(&core);
42 Self { base, core }
43 }
44}
45
46impl PlanTreeNodeUnary for LogicalUpdate {
47 fn input(&self) -> PlanRef {
48 self.core.input.clone()
49 }
50
51 fn clone_with_input(&self, input: PlanRef) -> Self {
52 let mut core = self.core.clone();
53 core.input = input;
54 core.into()
55 }
56}
57
58impl_plan_tree_node_for_unary! { LogicalUpdate }
59impl_distill_by_unit!(LogicalUpdate, core, "LogicalUpdate");
60
61impl ExprRewritable for LogicalUpdate {
62 fn has_rewritable_expr(&self) -> bool {
63 true
64 }
65
66 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
67 let mut core = self.core.clone();
68 core.rewrite_exprs(r);
69 Self::from(core).into()
70 }
71}
72
73impl ExprVisitable for LogicalUpdate {
74 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
75 self.core.visit_exprs(v);
76 }
77}
78
79impl ColPrunable for LogicalUpdate {
80 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
81 let pruned_input = {
82 let input = &self.core.input;
83 let required_cols: Vec<_> = (0..input.schema().len()).collect();
84 input.prune_col(&required_cols, ctx)
85 };
86
87 LogicalProject::with_out_col_idx(
89 self.clone_with_input(pruned_input).into(),
90 required_cols.iter().copied(),
91 )
92 .into()
93 }
94}
95
96impl PredicatePushdown for LogicalUpdate {
97 fn predicate_pushdown(
98 &self,
99 predicate: Condition,
100 ctx: &mut PredicatePushdownContext,
101 ) -> PlanRef {
102 gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
103 }
104}
105
106impl ToBatch for LogicalUpdate {
107 fn to_batch(&self) -> Result<PlanRef> {
108 let new_input = self.input().to_batch()?;
109 let mut new_logical = self.core.clone();
110 new_logical.input = new_input;
111 Ok(BatchUpdate::new(new_logical, self.schema().clone()).into())
112 }
113}
114
115impl ToStream for LogicalUpdate {
116 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
117 unreachable!("update should always be converted to batch plan");
118 }
119
120 fn logical_rewrite_for_stream(
121 &self,
122 _ctx: &mut RewriteStreamContext,
123 ) -> Result<(PlanRef, ColIndexMapping)> {
124 unreachable!("update should always be converted to batch plan");
125 }
126}