risingwave_frontend/optimizer/plan_node/
stream_eowc_over_window.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::catalog::Field;
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_expr::window_function::WindowFuncKind;
21use risingwave_pb::stream_plan::stream_node::PbNodeBody;
22
23use super::generic::{self, PlanWindowFunction};
24use super::stream::prelude::*;
25use super::utils::{TableCatalogBuilder, impl_distill_by_unit};
26use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef};
27use crate::TableCatalog;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::property::{MonotonicityMap, StreamKind, WatermarkColumns};
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamEowcOverWindow {
34    pub base: PlanBase<Stream>,
35    core: generic::OverWindow<PlanRef>,
36}
37
38impl StreamEowcOverWindow {
39    pub fn new(core: generic::OverWindow<PlanRef>) -> Self {
40        assert!(core.funcs_have_same_partition_and_order());
41
42        let input = &core.input;
43        assert!(input.append_only());
44        assert!(input.emit_on_window_close());
45
46        // Should order by a single watermark column.
47        let order_key = &core.window_functions[0].order_by;
48        assert_eq!(order_key.len(), 1);
49        assert_eq!(order_key[0].order_type, OrderType::ascending());
50        let order_key_idx = order_key[0].column_index;
51        let input_watermark_cols = core.input.watermark_columns();
52        assert!(input_watermark_cols.contains(order_key_idx));
53
54        // `EowcOverWindowExecutor` cannot produce any watermark columns, because there may be some
55        // ancient rows in some rarely updated partitions that are emitted at the end of time.
56        let watermark_columns = WatermarkColumns::new();
57
58        let base = PlanBase::new_stream_with_core(
59            &core,
60            input.distribution().clone(),
61            StreamKind::AppendOnly,
62            true,
63            watermark_columns,
64            // we cannot derive monotonicity for any column for the same reason as watermark columns
65            MonotonicityMap::new(),
66        );
67        StreamEowcOverWindow { base, core }
68    }
69
70    fn window_functions(&self) -> &[PlanWindowFunction] {
71        &self.core.window_functions
72    }
73
74    fn partition_key_indices(&self) -> Vec<usize> {
75        self.window_functions()[0]
76            .partition_by
77            .iter()
78            .map(|i| i.index())
79            .collect()
80    }
81
82    fn order_key_index(&self) -> usize {
83        self.window_functions()[0].order_by[0].column_index
84    }
85
86    fn infer_state_table(&self) -> TableCatalog {
87        // The EOWC over window state table has the same schema as the input.
88
89        let in_fields = self.core.input.schema().fields();
90        let mut tbl_builder = TableCatalogBuilder::default();
91        for field in in_fields {
92            tbl_builder.add_column(field);
93        }
94
95        let mut order_cols = HashSet::new();
96        let partition_key_indices = self.partition_key_indices();
97        for idx in &partition_key_indices {
98            if !order_cols.contains(idx) {
99                tbl_builder.add_order_column(*idx, OrderType::ascending());
100                order_cols.insert(*idx);
101            }
102        }
103        let read_prefix_len_hint = tbl_builder.get_current_pk_len();
104        let order_key_index = self.order_key_index();
105        if !order_cols.contains(&order_key_index) {
106            tbl_builder.add_order_column(order_key_index, OrderType::ascending());
107            order_cols.insert(order_key_index);
108        }
109        for idx in self.core.input.expect_stream_key() {
110            if !order_cols.contains(idx) {
111                tbl_builder.add_order_column(*idx, OrderType::ascending());
112                order_cols.insert(*idx);
113            }
114        }
115
116        let in_dist_key = self.core.input.distribution().dist_column_indices();
117        tbl_builder.build(in_dist_key.to_vec(), read_prefix_len_hint)
118    }
119
120    /// Returns true if any window function requires intermediate state persistence.
121    /// Currently this includes numbering functions (`row_number`, `rank`, `dense_rank`).
122    fn needs_intermediate_state_table(&self) -> bool {
123        self.window_functions().iter().any(|f| {
124            matches!(
125                f.kind,
126                WindowFuncKind::RowNumber | WindowFuncKind::Rank | WindowFuncKind::DenseRank
127            )
128        })
129    }
130
131    /// Infer the intermediate state table for persisting window function states.
132    ///
133    /// Schema: partition key columns + `state_0..state_{n-1}` (Bytea, one per window function call).
134    /// PK: partition key columns (ascending) only.
135    fn infer_intermediate_state_table(&self) -> TableCatalog {
136        let in_fields = self.core.input.schema().fields();
137        let mut tbl_builder = TableCatalogBuilder::default();
138
139        // Add partition key columns
140        let partition_key_indices = self.partition_key_indices();
141        for &idx in &partition_key_indices {
142            tbl_builder.add_column(&in_fields[idx]);
143        }
144
145        // Add state columns: one per window function call (state_0, state_1, ..., state_{n-1})
146        let num_calls = self.window_functions().len();
147        for i in 0..num_calls {
148            tbl_builder.add_column(&Field::with_name(DataType::Bytea, format!("state_{}", i)));
149        }
150
151        // PK = partition key columns only (ascending)
152        for i in 0..partition_key_indices.len() {
153            tbl_builder.add_order_column(i, OrderType::ascending());
154        }
155        let read_prefix_len_hint = tbl_builder.get_current_pk_len();
156
157        // Distribution key: same as partition key distribution
158        let in_dist_key = self.core.input.distribution().dist_column_indices();
159        // Map input distribution key indices to intermediate state table column indices.
160        // The partition key columns are added first at indices 0..partition_key_indices.len().
161        let dist_key: Vec<usize> = in_dist_key
162            .iter()
163            .filter_map(|&idx| {
164                partition_key_indices
165                    .iter()
166                    .position(|&pk_idx| pk_idx == idx)
167            })
168            .collect();
169
170        tbl_builder.build(dist_key, read_prefix_len_hint)
171    }
172}
173
174impl_distill_by_unit!(StreamEowcOverWindow, core, "StreamEowcOverWindow");
175
176impl PlanTreeNodeUnary<Stream> for StreamEowcOverWindow {
177    fn input(&self) -> PlanRef {
178        self.core.input.clone()
179    }
180
181    fn clone_with_input(&self, input: PlanRef) -> Self {
182        let mut core = self.core.clone();
183        core.input = input;
184        Self::new(core)
185    }
186}
187impl_plan_tree_node_for_unary! { Stream, StreamEowcOverWindow }
188
189impl StreamNode for StreamEowcOverWindow {
190    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
191        use risingwave_pb::stream_plan::*;
192
193        let calls = self
194            .window_functions()
195            .iter()
196            .map(PlanWindowFunction::to_protobuf)
197            .collect();
198        let partition_by = self.window_functions()[0]
199            .partition_by
200            .iter()
201            .map(|i| i.index() as _)
202            .collect();
203        let order_by = self.window_functions()[0]
204            .order_by
205            .iter()
206            .map(|o| o.to_protobuf())
207            .collect();
208        let state_table = self
209            .infer_state_table()
210            .with_id(state.gen_table_id_wrapped())
211            .to_internal_table_prost();
212
213        // Build intermediate state table if needed
214        let intermediate_state_table = if self.needs_intermediate_state_table() {
215            Some(
216                self.infer_intermediate_state_table()
217                    .with_id(state.gen_table_id_wrapped())
218                    .to_internal_table_prost(),
219            )
220        } else {
221            None
222        };
223
224        PbNodeBody::EowcOverWindow(Box::new(EowcOverWindowNode {
225            calls,
226            partition_by,
227            order_by,
228            state_table: Some(state_table),
229            intermediate_state_table,
230        }))
231    }
232}
233
234impl ExprRewritable<Stream> for StreamEowcOverWindow {}
235
236impl ExprVisitable for StreamEowcOverWindow {}