risingwave_frontend/optimizer/plan_node/
logical_now.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::{self, GenericPlanRef, Mode};
20use super::utils::{Distill, childless_record};
21use super::{
22    ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, LogicalValues,
23    PlanBase, PlanRef, PredicatePushdown, RewriteStreamContext, StreamNow, ToBatch, ToStream,
24    ToStreamContext,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::column_names_pretty;
29use crate::utils::ColIndexMapping;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct LogicalNow {
33    pub base: PlanBase<Logical>,
34    core: generic::Now,
35}
36
37impl LogicalNow {
38    pub fn new(core: generic::Now) -> Self {
39        let base = PlanBase::new_logical_with_core(&core);
40        Self { base, core }
41    }
42
43    pub fn max_one_row(&self) -> bool {
44        match self.core.mode {
45            Mode::UpdateCurrent => true,
46            Mode::GenerateSeries { .. } => false,
47        }
48    }
49}
50
51impl Distill for LogicalNow {
52    fn distill<'a>(&self) -> XmlNode<'a> {
53        let vec = if self.base.ctx().is_explain_verbose() {
54            vec![("output", column_names_pretty(self.schema()))]
55        } else {
56            vec![]
57        };
58
59        childless_record("LogicalNow", vec)
60    }
61}
62
63impl_plan_tree_node_for_leaf! { LogicalNow }
64
65impl ExprRewritable for LogicalNow {}
66
67impl ExprVisitable for LogicalNow {}
68
69impl PredicatePushdown for LogicalNow {
70    fn predicate_pushdown(
71        &self,
72        predicate: crate::utils::Condition,
73        _ctx: &mut super::PredicatePushdownContext,
74    ) -> crate::PlanRef {
75        LogicalFilter::create(self.clone().into(), predicate)
76    }
77}
78
79impl ToStream for LogicalNow {
80    fn logical_rewrite_for_stream(
81        &self,
82        _ctx: &mut RewriteStreamContext,
83    ) -> Result<(PlanRef, ColIndexMapping)> {
84        Ok((self.clone().into(), ColIndexMapping::new(vec![Some(0)], 1)))
85    }
86
87    /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)`
88    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
89        Ok(StreamNow::new(self.core.clone()).into())
90    }
91}
92
93impl ToBatch for LogicalNow {
94    fn to_batch(&self) -> Result<PlanRef> {
95        bail!("`LogicalNow` can only be converted to stream")
96    }
97}
98
99/// The trait for column pruning, only logical plan node will use it, though all plan node impl it.
100impl ColPrunable for LogicalNow {
101    fn prune_col(&self, required_cols: &[usize], _: &mut ColumnPruningContext) -> PlanRef {
102        if required_cols.is_empty() {
103            LogicalValues::new(vec![], Schema::empty().clone(), self.ctx()).into()
104        } else {
105            assert_eq!(required_cols, &[0], "we only output one column");
106            self.clone().into()
107        }
108    }
109}