risingwave_frontend/optimizer/plan_node/
stream_now.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_common::types::Datum;
17use risingwave_common::util::value_encoding::DatumToProtoExt;
18use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode};
21
22use super::generic::{GenericPlanNode, Mode};
23use super::stream::prelude::*;
24use super::utils::{Distill, TableCatalogBuilder, childless_record};
25use super::{ExprRewritable, PlanBase, StreamNode, generic};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::utils::column_names_pretty;
28use crate::optimizer::property::{Distribution, Monotonicity, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamNow {
33    pub base: PlanBase<Stream>,
34    core: generic::Now,
35}
36
37impl StreamNow {
38    pub fn new(core: generic::Now) -> Self {
39        let mut watermark_columns = WatermarkColumns::new();
40        watermark_columns.insert(0, core.ctx().next_watermark_group_id()); // `StreamNow` generates watermark messages
41
42        let mut columns_monotonicity = MonotonicityMap::new();
43        columns_monotonicity.insert(0, Monotonicity::NonDecreasing);
44
45        let base = PlanBase::new_stream_with_core(
46            &core,
47            Distribution::Single,
48            if core.mode.is_generate_series() {
49                StreamKind::AppendOnly
50            } else {
51                StreamKind::Retract
52            },
53            core.mode.is_generate_series(), // emit on window close
54            watermark_columns,
55            columns_monotonicity,
56        );
57        Self { base, core }
58    }
59}
60
61impl Distill for StreamNow {
62    fn distill<'a>(&self) -> XmlNode<'a> {
63        let vec = if self.base.ctx().is_explain_verbose() {
64            vec![("output", column_names_pretty(self.schema()))]
65        } else {
66            vec![]
67        };
68
69        childless_record("StreamNow", vec)
70    }
71}
72
73impl_plan_tree_node_for_leaf! { Stream, StreamNow }
74
75impl StreamNode for StreamNow {
76    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
77        let schema = self.base.schema();
78        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
79        schema.fields().iter().for_each(|field| {
80            internal_table_catalog_builder.add_column(field);
81        });
82
83        // `Now` and its state table (recording last generated timestamp) must be singleton.
84        let table_catalog = internal_table_catalog_builder
85            .build(vec![], 0)
86            .with_id(state.gen_table_id_wrapped());
87
88        NodeBody::Now(Box::new(PbNowNode {
89            state_table: Some(table_catalog.to_internal_table_prost()),
90            mode: Some(match &self.core.mode {
91                Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}),
92                Mode::GenerateSeries {
93                    start_timestamp,
94                    interval,
95                } => PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
96                    start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()),
97                    interval: Some(Datum::Some((*interval).into()).to_protobuf()),
98                }),
99            }),
100        }))
101    }
102}
103
104impl ExprRewritable<Stream> for StreamNow {}
105
106impl ExprVisitable for StreamNow {}