risingwave_stream/from_proto/
over_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::session_config::OverWindowCachePolicy;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_expr::window_function::WindowFuncCall;
20use risingwave_pb::stream_plan::PbOverWindowNode;
21use risingwave_storage::StateStore;
22
23use super::ExecutorBuilder;
24use crate::common::table::state_table::StateTable;
25use crate::error::StreamResult;
26use crate::executor::{Executor, OverWindowExecutor, OverWindowExecutorArgs};
27use crate::task::ExecutorParams;
28
29pub struct OverWindowExecutorBuilder;
30
31impl ExecutorBuilder for OverWindowExecutorBuilder {
32    type Node = PbOverWindowNode;
33
34    async fn new_boxed_executor(
35        params: ExecutorParams,
36        node: &Self::Node,
37        store: impl StateStore,
38    ) -> StreamResult<Executor> {
39        let [input]: [_; 1] = params.input.try_into().unwrap();
40        let calls: Vec<_> = node
41            .get_calls()
42            .iter()
43            .map(WindowFuncCall::from_protobuf)
44            .try_collect()?;
45        let partition_key_indices = node
46            .get_partition_by()
47            .iter()
48            .map(|i| *i as usize)
49            .collect();
50        let (order_key_indices, order_key_order_types) = node
51            .get_order_by()
52            .iter()
53            .map(ColumnOrder::from_protobuf)
54            .map(|o| (o.column_index, o.order_type))
55            .unzip();
56        let vnodes = Some(Arc::new(
57            params
58                .vnode_bitmap
59                .expect("vnodes not set for EOWC over window"),
60        ));
61        let state_table =
62            StateTable::from_table_catalog(node.get_state_table()?, store, vnodes).await;
63        let exec = OverWindowExecutor::new(OverWindowExecutorArgs {
64            actor_ctx: params.actor_context,
65
66            input,
67
68            schema: params.info.schema.clone(),
69            calls,
70            partition_key_indices,
71            order_key_indices,
72            order_key_order_types,
73
74            state_table,
75            watermark_epoch: params.watermark_epoch,
76            metrics: params.executor_stats,
77
78            chunk_size: params.env.config().developer.chunk_size,
79            cache_policy: OverWindowCachePolicy::from_protobuf(
80                node.get_cache_policy().unwrap_or_default(),
81            ),
82        });
83        Ok((params.info, exec).into())
84    }
85}