risingwave_frontend/optimizer/plan_node/
stream_over_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
18use risingwave_pb::stream_plan::stream_node::PbNodeBody;
19
20use super::generic::{GenericPlanNode, PlanWindowFunction};
21use super::stream::prelude::*;
22use super::utils::{TableCatalogBuilder, impl_distill_by_unit};
23use super::{
24    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
25};
26use crate::TableCatalog;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamOverWindow {
33    pub base: PlanBase<Stream>,
34    core: generic::OverWindow<PlanRef>,
35}
36
37impl StreamOverWindow {
38    pub fn new(core: generic::OverWindow<PlanRef>) -> Result<Self> {
39        assert!(core.funcs_have_same_partition_and_order());
40        reject_upsert_input!(core.input);
41
42        let input = &core.input;
43        let watermark_columns = WatermarkColumns::new();
44
45        let base = PlanBase::new_stream_with_core(
46            &core,
47            input.distribution().clone(),
48            StreamKind::Retract, // general over window cannot be append-only
49            false,
50            watermark_columns,
51            MonotonicityMap::new(), // TODO: derive monotonicity
52        );
53
54        Ok(StreamOverWindow { base, core })
55    }
56
57    fn infer_state_table(&self) -> TableCatalog {
58        let mut tbl_builder = TableCatalogBuilder::default();
59
60        let out_schema = self.core.schema();
61        for field in out_schema.fields() {
62            tbl_builder.add_column(field);
63        }
64
65        let mut order_cols = HashSet::new();
66        for idx in self.core.partition_key_indices() {
67            if order_cols.insert(idx) {
68                tbl_builder.add_order_column(idx, OrderType::ascending());
69            }
70        }
71        let read_prefix_len_hint = tbl_builder.get_current_pk_len();
72        for o in self.core.order_key() {
73            if order_cols.insert(o.column_index) {
74                tbl_builder.add_order_column(o.column_index, o.order_type);
75            }
76        }
77        for &idx in self.core.input.expect_stream_key() {
78            if order_cols.insert(idx) {
79                tbl_builder.add_order_column(idx, OrderType::ascending());
80            }
81        }
82
83        let in_dist_key = self.core.input.distribution().dist_column_indices();
84        tbl_builder.build(in_dist_key.to_vec(), read_prefix_len_hint)
85    }
86}
87
88impl_distill_by_unit!(StreamOverWindow, core, "StreamOverWindow");
89
90impl PlanTreeNodeUnary<Stream> for StreamOverWindow {
91    fn input(&self) -> PlanRef {
92        self.core.input.clone()
93    }
94
95    fn clone_with_input(&self, input: PlanRef) -> Self {
96        let mut core = self.core.clone();
97        core.input = input;
98        Self::new(core).unwrap()
99    }
100}
101impl_plan_tree_node_for_unary! { Stream, StreamOverWindow }
102
103impl StreamNode for StreamOverWindow {
104    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
105        use risingwave_pb::stream_plan::*;
106
107        let calls = self
108            .core
109            .window_functions()
110            .iter()
111            .map(PlanWindowFunction::to_protobuf)
112            .collect();
113        let partition_by = self
114            .core
115            .partition_key_indices()
116            .into_iter()
117            .map(|idx| idx as _)
118            .collect();
119        let order_by = self
120            .core
121            .order_key()
122            .iter()
123            .copied()
124            .map(ColumnOrder::to_protobuf)
125            .collect();
126        let state_table = self
127            .infer_state_table()
128            .with_id(state.gen_table_id_wrapped())
129            .to_internal_table_prost();
130        let cache_policy = self
131            .base
132            .ctx()
133            .session_ctx()
134            .config()
135            .streaming_over_window_cache_policy();
136
137        PbNodeBody::OverWindow(Box::new(OverWindowNode {
138            calls,
139            partition_by,
140            order_by,
141            state_table: Some(state_table),
142            cache_policy: cache_policy.to_protobuf() as _,
143        }))
144    }
145}
146
147impl ExprRewritable<Stream> for StreamOverWindow {}
148
149impl ExprVisitable for StreamOverWindow {}