risingwave_frontend/expr/
now.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16use risingwave_common::util::epoch::Epoch;
17use risingwave_pb::expr::ExprNode;
18use risingwave_pb::expr::expr_node::{self, NowRexNode};
19
20use super::{Expr, ExprImpl, ExprRewriter, FunctionCall, Literal};
21use crate::expr::ExprVisitor;
22
23/// The `NOW()` function.
24/// - in streaming queries, it represents a retractable monotonic timestamp stream,
25/// - in batch queries, it represents a constant timestamp.
26///
27/// `NOW()` should only appear during optimization, or in the table catalog for column default
28/// values. Before execution, it should be rewritten to `Literal` in batch queries, or `NowNode` in
29/// streaming queries.
30#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
31pub struct Now;
32
33impl Expr for Now {
34    fn return_type(&self) -> DataType {
35        DataType::Timestamptz
36    }
37
38    fn to_expr_proto(&self) -> ExprNode {
39        ExprNode {
40            function_type: expr_node::Type::Unspecified.into(),
41            return_type: Some(self.return_type().into()),
42            rex_node: Some(expr_node::RexNode::Now(NowRexNode {})),
43        }
44    }
45}
46
47/// Expression rewriter to inline `NOW()` and `PROCTIME()` to a literal extracted from the epoch.
48///
49/// This should only be applied for batch queries. See the documentation of [`Now`] for details.
50pub struct InlineNowProcTime {
51    /// The current epoch value.
52    epoch: Epoch,
53}
54
55impl InlineNowProcTime {
56    pub fn new(epoch: Epoch) -> Self {
57        Self { epoch }
58    }
59
60    fn literal(&self) -> ExprImpl {
61        Literal::new(Some(self.epoch.as_scalar()), Now.return_type()).into()
62    }
63}
64
65impl ExprRewriter for InlineNowProcTime {
66    fn rewrite_now(&mut self, _now: Now) -> ExprImpl {
67        self.literal()
68    }
69
70    fn rewrite_function_call(&mut self, func_call: super::FunctionCall) -> ExprImpl {
71        let (func_type, inputs, ret) = func_call.decompose();
72
73        if let expr_node::Type::Proctime = func_type {
74            assert!(inputs.is_empty());
75            return self.literal();
76        }
77
78        let inputs = inputs
79            .into_iter()
80            .map(|expr| self.rewrite_expr(expr))
81            .collect();
82        FunctionCall::new_unchecked(func_type, inputs, ret).into()
83    }
84}
85
86/// Expression rewriter to rewrite `NOW()` to `PROCTIME()`
87///
88/// This is applied for the sink into table query for those column with default expression containing `now()` because streaming execution can not handle `now` expression
89pub struct RewriteNowToProcTime;
90
91impl ExprRewriter for RewriteNowToProcTime {
92    fn rewrite_now(&mut self, _now: Now) -> ExprImpl {
93        FunctionCall::new(expr_node::Type::Proctime, vec![])
94            .unwrap()
95            .into()
96    }
97}
98
99#[derive(Default)]
100pub struct NowProcTimeFinder {
101    has: bool,
102}
103
104impl NowProcTimeFinder {
105    pub fn has(&self) -> bool {
106        self.has
107    }
108}
109
110impl ExprVisitor for NowProcTimeFinder {
111    fn visit_now(&mut self, _: &Now) {
112        self.has = true;
113    }
114
115    fn visit_function_call(&mut self, func_call: &FunctionCall) {
116        if let expr_node::Type::Proctime = func_call.func_type {
117            self.has = true;
118            return;
119        }
120
121        func_call
122            .inputs()
123            .iter()
124            .for_each(|expr| self.visit_expr(expr));
125    }
126}