risingwave_frontend/optimizer/plan_node/generic/
now.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use educe::Educe;
16use enum_as_inner::EnumAsInner;
17use pretty_xmlish::{Pretty, Str, XmlNode};
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::types::{DataType, Interval, Timestamptz};
20
21use super::{DistillUnit, GenericPlanNode};
22use crate::OptimizerContextRef;
23use crate::optimizer::plan_node::utils::childless_record;
24use crate::optimizer::property::FunctionalDependencySet;
25
26#[derive(Debug, Clone, Educe)]
27#[educe(PartialEq, Eq, Hash)]
28pub struct Now {
29    #[educe(PartialEq(ignore))]
30    #[educe(Hash(ignore))]
31    pub ctx: OptimizerContextRef,
32
33    pub mode: Mode,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumAsInner)]
37pub enum Mode {
38    /// Emit current timestamp on startup, update it on barrier.
39    UpdateCurrent,
40    /// Generate a series of timestamps starting from `start_timestamp` with `interval`.
41    /// Keep generating new timestamps on barrier.
42    GenerateSeries {
43        start_timestamp: Timestamptz,
44        interval: Interval,
45    },
46}
47
48impl GenericPlanNode for Now {
49    fn functional_dependency(&self) -> crate::optimizer::property::FunctionalDependencySet {
50        FunctionalDependencySet::new(1) // only one column and no dependency
51    }
52
53    fn schema(&self) -> risingwave_common::catalog::Schema {
54        Schema::new(vec![Field {
55            data_type: DataType::Timestamptz,
56            name: String::from(if self.mode.is_update_current() {
57                "now"
58            } else {
59                "ts"
60            }),
61        }])
62    }
63
64    fn stream_key(&self) -> Option<Vec<usize>> {
65        match self.mode {
66            Mode::UpdateCurrent => Some(vec![]),
67            Mode::GenerateSeries { .. } => Some(vec![0]),
68        }
69    }
70
71    fn ctx(&self) -> OptimizerContextRef {
72        self.ctx.clone()
73    }
74}
75
76impl Now {
77    pub fn update_current(ctx: OptimizerContextRef) -> Self {
78        Self::new_inner(ctx, Mode::UpdateCurrent)
79    }
80
81    pub fn generate_series(
82        ctx: OptimizerContextRef,
83        start_timestamp: Timestamptz,
84        interval: Interval,
85    ) -> Self {
86        Self::new_inner(
87            ctx,
88            Mode::GenerateSeries {
89                start_timestamp,
90                interval,
91            },
92        )
93    }
94
95    fn new_inner(ctx: OptimizerContextRef, mode: Mode) -> Self {
96        Self { ctx, mode }
97    }
98}
99
100impl DistillUnit for Now {
101    fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
102        childless_record(name, vec![("mode", Pretty::debug(&self.mode))])
103    }
104}