risingwave_frontend/optimizer/plan_node/
stream_now.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_common::types::Datum;
17use risingwave_common::util::value_encoding::DatumToProtoExt;
18use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode};
21
22use super::generic::{GenericPlanNode, Mode};
23use super::stream::prelude::*;
24use super::utils::{Distill, TableCatalogBuilder, childless_record};
25use super::{ExprRewritable, PlanBase, StreamNode, generic};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::utils::column_names_pretty;
28use crate::optimizer::property::{Distribution, Monotonicity, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamNow {
33    pub base: PlanBase<Stream>,
34    core: generic::Now,
35}
36
37impl StreamNow {
38    pub fn new(core: generic::Now) -> Self {
39        let mut watermark_columns = WatermarkColumns::new();
40        watermark_columns.insert(0, core.ctx().next_watermark_group_id()); // `StreamNow` generates watermark messages
41
42        let mut columns_monotonicity = MonotonicityMap::new();
43        columns_monotonicity.insert(0, Monotonicity::NonDecreasing);
44
45        let base = PlanBase::new_stream_with_core(
46            &core,
47            Distribution::Single,
48            core.mode.is_generate_series(), // append only
49            core.mode.is_generate_series(), // emit on window close
50            watermark_columns,
51            columns_monotonicity,
52        );
53        Self { base, core }
54    }
55}
56
57impl Distill for StreamNow {
58    fn distill<'a>(&self) -> XmlNode<'a> {
59        let vec = if self.base.ctx().is_explain_verbose() {
60            vec![("output", column_names_pretty(self.schema()))]
61        } else {
62            vec![]
63        };
64
65        childless_record("StreamNow", vec)
66    }
67}
68
69impl_plan_tree_node_for_leaf! { StreamNow }
70
71impl StreamNode for StreamNow {
72    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
73        let schema = self.base.schema();
74        let dist_keys = self.base.distribution().dist_column_indices().to_vec();
75        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
76        schema.fields().iter().for_each(|field| {
77            internal_table_catalog_builder.add_column(field);
78        });
79
80        let table_catalog = internal_table_catalog_builder
81            .build(dist_keys, 0)
82            .with_id(state.gen_table_id_wrapped());
83        NodeBody::Now(Box::new(PbNowNode {
84            state_table: Some(table_catalog.to_internal_table_prost()),
85            mode: Some(match &self.core.mode {
86                Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}),
87                Mode::GenerateSeries {
88                    start_timestamp,
89                    interval,
90                } => PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
91                    start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()),
92                    interval: Some(Datum::Some((*interval).into()).to_protobuf()),
93                }),
94            }),
95        }))
96    }
97}
98
99impl ExprRewritable for StreamNow {}
100
101impl ExprVisitable for StreamNow {}