Skip to main content

risingwave_stream/from_proto/
over_window.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::config::streaming::OverWindowCachePolicy;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_expr::window_function::WindowFuncCall;
20use risingwave_pb::stream_plan::PbOverWindowNode;
21use risingwave_storage::StateStore;
22
23use super::ExecutorBuilder;
24use crate::common::table::state_table::StateTableBuilder;
25use crate::error::StreamResult;
26use crate::executor::{Executor, OverWindowExecutor, OverWindowExecutorArgs};
27use crate::task::ExecutorParams;
28
29pub struct OverWindowExecutorBuilder;
30
31impl_stream_node_body!(OverWindow(PbOverWindowNode) => OverWindowExecutorBuilder);
32
33impl ExecutorBuilder for OverWindowExecutorBuilder {
34    type Node = PbOverWindowNode;
35
36    async fn new_boxed_executor(
37        params: ExecutorParams,
38        node: &Self::Node,
39        store: impl StateStore,
40    ) -> StreamResult<Executor> {
41        let [input]: [_; 1] = params.input.try_into().unwrap();
42        let calls: Vec<_> = node
43            .get_calls()
44            .iter()
45            .map(WindowFuncCall::from_protobuf)
46            .try_collect()?;
47        let partition_key_indices = node
48            .get_partition_by()
49            .iter()
50            .map(|i| *i as usize)
51            .collect();
52        let (order_key_indices, order_key_order_types) = node
53            .get_order_by()
54            .iter()
55            .map(ColumnOrder::from_protobuf)
56            .map(|o| (o.column_index, o.order_type))
57            .unzip();
58        let vnodes = Some(Arc::new(
59            params
60                .vnode_bitmap
61                .expect("vnodes not set for EOWC over window"),
62        ));
63        let state_table = StateTableBuilder::new(node.get_state_table()?, store, vnodes)
64            .enable_preload_all_rows_by_config(&params.config)
65            .build()
66            .await;
67
68        // Previously, the `cache_policy` is persisted in the plan node.
69        // Now it's always `Unspecified` and we should refer to the job's config override.
70        #[expect(deprecated)]
71        let cache_policy = (node.get_cache_policy())
72            .map_or(params.config.developer.over_window_cache_policy, |v| {
73                OverWindowCachePolicy::from_protobuf(v)
74            });
75
76        let exec = OverWindowExecutor::new(OverWindowExecutorArgs {
77            actor_ctx: params.actor_context,
78
79            input,
80
81            schema: params.info.schema.clone(),
82            calls,
83            partition_key_indices,
84            order_key_indices,
85            order_key_order_types,
86
87            state_table,
88            watermark_epoch: params.watermark_epoch,
89            metrics: params.executor_stats,
90
91            chunk_size: params.config.developer.chunk_size,
92            cache_policy,
93        });
94        Ok((params.info, exec).into())
95    }
96}