risingwave_frontend/optimizer/rule/stream/
generate_series_with_now_rule.rs1use risingwave_common::types::DataType;
16use risingwave_pb::expr::table_function::PbType as PbTableFuncType;
17
18use crate::expr::{Expr, ExprRewriter};
19use crate::optimizer::plan_node::{LogicalNow, generic};
20use crate::optimizer::rule::prelude::{PlanRef, *};
21
22pub struct GenerateSeriesWithNowRule {}
23impl Rule<Logical> for GenerateSeriesWithNowRule {
24 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
25 let ctx = plan.ctx();
26 let table_func = plan.as_logical_table_function()?.table_function();
27
28 if !table_func.args.iter().any(|arg| arg.has_now()) {
29 return None;
30 }
31
32 if !(table_func.function_type == PbTableFuncType::GenerateSeries
33 && table_func.args.len() == 3
34 && table_func.args[0].return_type() == DataType::Timestamptz
35 && table_func.args[1].is_now()
36 && table_func.args[2].return_type() == DataType::Interval)
37 {
38 ctx.warn_to_user(
40 "`now()` is currently only supported in `generate_series(timestamptz, timestamptz, interval)` function as `stop`. \
41 You may not using it correctly. Please kindly check the document."
42 );
43 return None;
44 }
45
46 let start_timestamp = ctx
47 .session_timezone()
48 .rewrite_expr(table_func.args[0].clone())
49 .try_fold_const()
50 .transpose()
51 .ok()
52 .flatten()
53 .flatten();
54 let interval = ctx
55 .session_timezone()
56 .rewrite_expr(table_func.args[2].clone())
57 .try_fold_const()
58 .transpose()
59 .ok()
60 .flatten()
61 .flatten();
62
63 if start_timestamp.is_none() || interval.is_none() {
64 ctx.warn_to_user(
65 "When using `generate_series` with `now()`, the `start` and `step` must be non-NULL constants",
66 );
67 return None;
68 }
69
70 Some(
71 LogicalNow::new(generic::Now::generate_series(
72 ctx,
73 start_timestamp.unwrap().into_timestamptz(),
74 interval.unwrap().into_interval(),
75 ))
76 .into(),
77 )
78 }
79}
80
81impl GenerateSeriesWithNowRule {
82 pub fn create() -> BoxedRule {
83 Box::new(Self {})
84 }
85}