risingwave_frontend/optimizer/plan_node/
logical_now.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::{self, GenericPlanRef, Mode};
20use super::utils::{Distill, childless_record};
21use super::{
22 ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter,
23 LogicalPlanRef as PlanRef, LogicalValues, PlanBase, PredicatePushdown, RewriteStreamContext,
24 StreamNow, ToBatch, ToStream, ToStreamContext,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::column_names_pretty;
29use crate::utils::ColIndexMapping;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct LogicalNow {
33 pub base: PlanBase<Logical>,
34 core: generic::Now,
35}
36
37impl LogicalNow {
38 pub fn new(core: generic::Now) -> Self {
39 let base = PlanBase::new_logical_with_core(&core);
40 Self { base, core }
41 }
42
43 pub fn max_one_row(&self) -> bool {
44 match self.core.mode {
45 Mode::UpdateCurrent => true,
46 Mode::GenerateSeries { .. } => false,
47 }
48 }
49}
50
51impl Distill for LogicalNow {
52 fn distill<'a>(&self) -> XmlNode<'a> {
53 let vec = if self.base.ctx().is_explain_verbose() {
54 vec![("output", column_names_pretty(self.schema()))]
55 } else {
56 vec![]
57 };
58
59 childless_record("LogicalNow", vec)
60 }
61}
62
63impl_plan_tree_node_for_leaf! { Logical, LogicalNow }
64
65impl ExprRewritable<Logical> for LogicalNow {}
66
67impl ExprVisitable for LogicalNow {}
68
69impl PredicatePushdown for LogicalNow {
70 fn predicate_pushdown(
71 &self,
72 predicate: crate::utils::Condition,
73 _ctx: &mut super::PredicatePushdownContext,
74 ) -> PlanRef {
75 LogicalFilter::create(self.clone().into(), predicate)
76 }
77}
78
79impl ToStream for LogicalNow {
80 fn logical_rewrite_for_stream(
81 &self,
82 _ctx: &mut RewriteStreamContext,
83 ) -> Result<(PlanRef, ColIndexMapping)> {
84 Ok((self.clone().into(), ColIndexMapping::new(vec![Some(0)], 1)))
85 }
86
87 fn to_stream(
89 &self,
90 _ctx: &mut ToStreamContext,
91 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
92 Ok(StreamNow::new(self.core.clone()).into())
93 }
94}
95
96impl ToBatch for LogicalNow {
97 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
98 bail!("`LogicalNow` can only be converted to stream")
99 }
100}
101
102impl ColPrunable for LogicalNow {
104 fn prune_col(&self, required_cols: &[usize], _: &mut ColumnPruningContext) -> PlanRef {
105 if required_cols.is_empty() {
106 LogicalValues::new(vec![], Schema::empty().clone(), self.ctx()).into()
107 } else {
108 assert_eq!(required_cols, &[0], "we only output one column");
109 self.clone().into()
110 }
111 }
112}