risingwave_frontend/optimizer/rule/stream/
generate_series_with_now_rule.rsuse risingwave_common::types::DataType;
use risingwave_pb::expr::table_function::PbType as PbTableFuncType;
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::{generic, LogicalNow};
use crate::optimizer::rule::{BoxedRule, Rule};
use crate::PlanRef;
pub struct GenerateSeriesWithNowRule {}
impl Rule for GenerateSeriesWithNowRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let ctx = plan.ctx();
let table_func = plan.as_logical_table_function()?.table_function();
if !table_func.args.iter().any(|arg| arg.has_now()) {
return None;
}
if !(table_func.function_type == PbTableFuncType::GenerateSeries
&& table_func.args.len() == 3
&& table_func.args[0].return_type() == DataType::Timestamptz
&& table_func.args[1].is_now()
&& table_func.args[2].return_type() == DataType::Interval)
{
ctx.warn_to_user(
"`now()` is currently only supported in `generate_series(timestamptz, timestamptz, interval)` function as `stop`. \
You may not using it correctly. Please kindly check the document."
);
return None;
}
let start_timestamp = ctx
.session_timezone()
.rewrite_expr(table_func.args[0].clone())
.try_fold_const()
.transpose()
.ok()
.flatten()
.flatten();
let interval = ctx
.session_timezone()
.rewrite_expr(table_func.args[2].clone())
.try_fold_const()
.transpose()
.ok()
.flatten()
.flatten();
if start_timestamp.is_none() || interval.is_none() {
ctx.warn_to_user(
"When using `generate_series` with `now()`, the `start` and `step` must be non-NULL constants",
);
return None;
}
Some(
LogicalNow::new(generic::Now::generate_series(
ctx,
start_timestamp.unwrap().into_timestamptz(),
interval.unwrap().into_interval(),
))
.into(),
)
}
}
impl GenerateSeriesWithNowRule {
pub fn create() -> BoxedRule {
Box::new(Self {})
}
}