risingwave_stream/from_proto/
over_window.rs1use std::sync::Arc;
16
17use risingwave_common::config::streaming::OverWindowCachePolicy;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_expr::window_function::WindowFuncCall;
20use risingwave_pb::stream_plan::PbOverWindowNode;
21use risingwave_storage::StateStore;
22
23use super::ExecutorBuilder;
24use crate::common::table::state_table::StateTableBuilder;
25use crate::error::StreamResult;
26use crate::executor::{Executor, OverWindowExecutor, OverWindowExecutorArgs};
27use crate::task::ExecutorParams;
28
29pub struct OverWindowExecutorBuilder;
30
31impl_stream_node_body!(OverWindow(PbOverWindowNode) => OverWindowExecutorBuilder);
32
33impl ExecutorBuilder for OverWindowExecutorBuilder {
34 type Node = PbOverWindowNode;
35
36 async fn new_boxed_executor(
37 params: ExecutorParams,
38 node: &Self::Node,
39 store: impl StateStore,
40 ) -> StreamResult<Executor> {
41 let [input]: [_; 1] = params.input.try_into().unwrap();
42 let calls: Vec<_> = node
43 .get_calls()
44 .iter()
45 .map(WindowFuncCall::from_protobuf)
46 .try_collect()?;
47 let partition_key_indices = node
48 .get_partition_by()
49 .iter()
50 .map(|i| *i as usize)
51 .collect();
52 let (order_key_indices, order_key_order_types) = node
53 .get_order_by()
54 .iter()
55 .map(ColumnOrder::from_protobuf)
56 .map(|o| (o.column_index, o.order_type))
57 .unzip();
58 let vnodes = Some(Arc::new(
59 params
60 .vnode_bitmap
61 .expect("vnodes not set for EOWC over window"),
62 ));
63 let state_table = StateTableBuilder::new(node.get_state_table()?, store, vnodes)
64 .enable_preload_all_rows_by_config(¶ms.config)
65 .build()
66 .await;
67
68 #[expect(deprecated)]
71 let cache_policy = (node.get_cache_policy())
72 .map_or(params.config.developer.over_window_cache_policy, |v| {
73 OverWindowCachePolicy::from_protobuf(v)
74 });
75
76 let exec = OverWindowExecutor::new(OverWindowExecutorArgs {
77 actor_ctx: params.actor_context,
78
79 input,
80
81 schema: params.info.schema.clone(),
82 calls,
83 partition_key_indices,
84 order_key_indices,
85 order_key_order_types,
86
87 state_table,
88 watermark_epoch: params.watermark_epoch,
89 metrics: params.executor_stats,
90
91 chunk_size: params.config.developer.chunk_size,
92 cache_policy,
93 });
94 Ok((params.info, exec).into())
95 }
96}