risingwave_frontend/optimizer/rule/stream/
generate_series_with_now_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16use risingwave_pb::expr::table_function::PbType as PbTableFuncType;
17
18use crate::PlanRef;
19use crate::expr::{Expr, ExprRewriter};
20use crate::optimizer::plan_node::{LogicalNow, generic};
21use crate::optimizer::rule::{BoxedRule, Rule};
22
23pub struct GenerateSeriesWithNowRule {}
24impl Rule for GenerateSeriesWithNowRule {
25    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
26        let ctx = plan.ctx();
27        let table_func = plan.as_logical_table_function()?.table_function();
28
29        if !table_func.args.iter().any(|arg| arg.has_now()) {
30            return None;
31        }
32
33        if !(table_func.function_type == PbTableFuncType::GenerateSeries
34            && table_func.args.len() == 3
35            && table_func.args[0].return_type() == DataType::Timestamptz
36            && table_func.args[1].is_now()
37            && table_func.args[2].return_type() == DataType::Interval)
38        {
39            // only convert `generate_series(const timestamptz, now(), const interval)`
40            ctx.warn_to_user(
41                "`now()` is currently only supported in `generate_series(timestamptz, timestamptz, interval)` function as `stop`. \
42                You may not using it correctly. Please kindly check the document."
43            );
44            return None;
45        }
46
47        let start_timestamp = ctx
48            .session_timezone()
49            .rewrite_expr(table_func.args[0].clone())
50            .try_fold_const()
51            .transpose()
52            .ok()
53            .flatten()
54            .flatten();
55        let interval = ctx
56            .session_timezone()
57            .rewrite_expr(table_func.args[2].clone())
58            .try_fold_const()
59            .transpose()
60            .ok()
61            .flatten()
62            .flatten();
63
64        if start_timestamp.is_none() || interval.is_none() {
65            ctx.warn_to_user(
66                "When using `generate_series` with `now()`, the `start` and `step` must be non-NULL constants",
67            );
68            return None;
69        }
70
71        Some(
72            LogicalNow::new(generic::Now::generate_series(
73                ctx,
74                start_timestamp.unwrap().into_timestamptz(),
75                interval.unwrap().into_interval(),
76            ))
77            .into(),
78        )
79    }
80}
81
82impl GenerateSeriesWithNowRule {
83    pub fn create() -> BoxedRule {
84        Box::new(Self {})
85    }
86}