risingwave_frontend/optimizer/plan_node/generic/
now.rs1use educe::Educe;
16use enum_as_inner::EnumAsInner;
17use pretty_xmlish::{Pretty, Str, XmlNode};
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::types::{DataType, Interval, Timestamptz};
20
21use super::{DistillUnit, GenericPlanNode};
22use crate::OptimizerContextRef;
23use crate::optimizer::plan_node::utils::childless_record;
24use crate::optimizer::property::FunctionalDependencySet;
25
26#[derive(Debug, Clone, Educe)]
27#[educe(PartialEq, Eq, Hash)]
28pub struct Now {
29 #[educe(PartialEq(ignore))]
30 #[educe(Hash(ignore))]
31 pub ctx: OptimizerContextRef,
32
33 pub mode: Mode,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumAsInner)]
37pub enum Mode {
38 UpdateCurrent,
40 GenerateSeries {
43 start_timestamp: Timestamptz,
44 interval: Interval,
45 },
46}
47
48impl GenericPlanNode for Now {
49 fn functional_dependency(&self) -> crate::optimizer::property::FunctionalDependencySet {
50 FunctionalDependencySet::new(1) }
52
53 fn schema(&self) -> risingwave_common::catalog::Schema {
54 Schema::new(vec![Field {
55 data_type: DataType::Timestamptz,
56 name: String::from(if self.mode.is_update_current() {
57 "now"
58 } else {
59 "ts"
60 }),
61 }])
62 }
63
64 fn stream_key(&self) -> Option<Vec<usize>> {
65 match self.mode {
66 Mode::UpdateCurrent => Some(vec![]),
67 Mode::GenerateSeries { .. } => Some(vec![0]),
68 }
69 }
70
71 fn ctx(&self) -> OptimizerContextRef {
72 self.ctx.clone()
73 }
74}
75
76impl Now {
77 pub fn update_current(ctx: OptimizerContextRef) -> Self {
78 Self::new_inner(ctx, Mode::UpdateCurrent)
79 }
80
81 pub fn generate_series(
82 ctx: OptimizerContextRef,
83 start_timestamp: Timestamptz,
84 interval: Interval,
85 ) -> Self {
86 Self::new_inner(
87 ctx,
88 Mode::GenerateSeries {
89 start_timestamp,
90 interval,
91 },
92 )
93 }
94
95 fn new_inner(ctx: OptimizerContextRef, mode: Mode) -> Self {
96 Self { ctx, mode }
97 }
98}
99
100impl DistillUnit for Now {
101 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
102 childless_record(name, vec![("mode", Pretty::debug(&self.mode))])
103 }
104}