risingwave_frontend/expr/
now.rs1use risingwave_common::types::DataType;
16use risingwave_common::util::epoch::Epoch;
17use risingwave_pb::expr::ExprNode;
18use risingwave_pb::expr::expr_node::{self, NowRexNode};
19
20use super::{Expr, ExprImpl, ExprRewriter, FunctionCall, Literal};
21use crate::expr::ExprVisitor;
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
31pub struct Now;
32
33impl Expr for Now {
34 fn return_type(&self) -> DataType {
35 DataType::Timestamptz
36 }
37
38 fn to_expr_proto(&self) -> ExprNode {
39 ExprNode {
40 function_type: expr_node::Type::Unspecified.into(),
41 return_type: Some(self.return_type().into()),
42 rex_node: Some(expr_node::RexNode::Now(NowRexNode {})),
43 }
44 }
45}
46
47pub struct InlineNowProcTime {
51 epoch: Epoch,
53}
54
55impl InlineNowProcTime {
56 pub fn new(epoch: Epoch) -> Self {
57 Self { epoch }
58 }
59
60 fn literal(&self) -> ExprImpl {
61 Literal::new(Some(self.epoch.as_scalar()), Now.return_type()).into()
62 }
63}
64
65impl ExprRewriter for InlineNowProcTime {
66 fn rewrite_now(&mut self, _now: Now) -> ExprImpl {
67 self.literal()
68 }
69
70 fn rewrite_function_call(&mut self, func_call: super::FunctionCall) -> ExprImpl {
71 let (func_type, inputs, ret) = func_call.decompose();
72
73 if let expr_node::Type::Proctime = func_type {
74 assert!(inputs.is_empty());
75 return self.literal();
76 }
77
78 let inputs = inputs
79 .into_iter()
80 .map(|expr| self.rewrite_expr(expr))
81 .collect();
82 FunctionCall::new_unchecked(func_type, inputs, ret).into()
83 }
84}
85
86pub struct RewriteNowToProcTime;
90
91impl ExprRewriter for RewriteNowToProcTime {
92 fn rewrite_now(&mut self, _now: Now) -> ExprImpl {
93 FunctionCall::new(expr_node::Type::Proctime, vec![])
94 .unwrap()
95 .into()
96 }
97}
98
99#[derive(Default)]
100pub struct NowProcTimeFinder {
101 has: bool,
102}
103
104impl NowProcTimeFinder {
105 pub fn has(&self) -> bool {
106 self.has
107 }
108}
109
110impl ExprVisitor for NowProcTimeFinder {
111 fn visit_now(&mut self, _: &Now) {
112 self.has = true;
113 }
114
115 fn visit_function_call(&mut self, func_call: &FunctionCall) {
116 if let expr_node::Type::Proctime = func_call.func_type {
117 self.has = true;
118 return;
119 }
120
121 func_call
122 .inputs()
123 .iter()
124 .for_each(|expr| self.visit_expr(expr));
125 }
126}