risingwave_frontend/optimizer/plan_node/
logical_now.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::{self, GenericPlanRef, Mode};
20use super::utils::{Distill, childless_record};
21use super::{
22 ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, LogicalValues,
23 PlanBase, PlanRef, PredicatePushdown, RewriteStreamContext, StreamNow, ToBatch, ToStream,
24 ToStreamContext,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::column_names_pretty;
29use crate::utils::ColIndexMapping;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct LogicalNow {
33 pub base: PlanBase<Logical>,
34 core: generic::Now,
35}
36
37impl LogicalNow {
38 pub fn new(core: generic::Now) -> Self {
39 let base = PlanBase::new_logical_with_core(&core);
40 Self { base, core }
41 }
42
43 pub fn max_one_row(&self) -> bool {
44 match self.core.mode {
45 Mode::UpdateCurrent => true,
46 Mode::GenerateSeries { .. } => false,
47 }
48 }
49}
50
51impl Distill for LogicalNow {
52 fn distill<'a>(&self) -> XmlNode<'a> {
53 let vec = if self.base.ctx().is_explain_verbose() {
54 vec![("output", column_names_pretty(self.schema()))]
55 } else {
56 vec![]
57 };
58
59 childless_record("LogicalNow", vec)
60 }
61}
62
63impl_plan_tree_node_for_leaf! { LogicalNow }
64
65impl ExprRewritable for LogicalNow {}
66
67impl ExprVisitable for LogicalNow {}
68
69impl PredicatePushdown for LogicalNow {
70 fn predicate_pushdown(
71 &self,
72 predicate: crate::utils::Condition,
73 _ctx: &mut super::PredicatePushdownContext,
74 ) -> crate::PlanRef {
75 LogicalFilter::create(self.clone().into(), predicate)
76 }
77}
78
79impl ToStream for LogicalNow {
80 fn logical_rewrite_for_stream(
81 &self,
82 _ctx: &mut RewriteStreamContext,
83 ) -> Result<(PlanRef, ColIndexMapping)> {
84 Ok((self.clone().into(), ColIndexMapping::new(vec![Some(0)], 1)))
85 }
86
87 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
89 Ok(StreamNow::new(self.core.clone()).into())
90 }
91}
92
93impl ToBatch for LogicalNow {
94 fn to_batch(&self) -> Result<PlanRef> {
95 bail!("`LogicalNow` can only be converted to stream")
96 }
97}
98
99impl ColPrunable for LogicalNow {
101 fn prune_col(&self, required_cols: &[usize], _: &mut ColumnPruningContext) -> PlanRef {
102 if required_cols.is_empty() {
103 LogicalValues::new(vec![], Schema::empty().clone(), self.ctx()).into()
104 } else {
105 assert_eq!(required_cols, &[0], "we only output one column");
106 self.clone().into()
107 }
108 }
109}