risingwave_stream/from_proto/
over_window.rs1use std::sync::Arc;
16
17use risingwave_common::session_config::OverWindowCachePolicy;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_expr::window_function::WindowFuncCall;
20use risingwave_pb::stream_plan::PbOverWindowNode;
21use risingwave_storage::StateStore;
22
23use super::ExecutorBuilder;
24use crate::common::table::state_table::StateTableBuilder;
25use crate::error::StreamResult;
26use crate::executor::{Executor, OverWindowExecutor, OverWindowExecutorArgs};
27use crate::task::ExecutorParams;
28
29pub struct OverWindowExecutorBuilder;
30
31impl ExecutorBuilder for OverWindowExecutorBuilder {
32 type Node = PbOverWindowNode;
33
34 async fn new_boxed_executor(
35 params: ExecutorParams,
36 node: &Self::Node,
37 store: impl StateStore,
38 ) -> StreamResult<Executor> {
39 let [input]: [_; 1] = params.input.try_into().unwrap();
40 let calls: Vec<_> = node
41 .get_calls()
42 .iter()
43 .map(WindowFuncCall::from_protobuf)
44 .try_collect()?;
45 let partition_key_indices = node
46 .get_partition_by()
47 .iter()
48 .map(|i| *i as usize)
49 .collect();
50 let (order_key_indices, order_key_order_types) = node
51 .get_order_by()
52 .iter()
53 .map(ColumnOrder::from_protobuf)
54 .map(|o| (o.column_index, o.order_type))
55 .unzip();
56 let vnodes = Some(Arc::new(
57 params
58 .vnode_bitmap
59 .expect("vnodes not set for EOWC over window"),
60 ));
61 let state_table = StateTableBuilder::new(node.get_state_table()?, store, vnodes)
62 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
63 .build()
64 .await;
65 let exec = OverWindowExecutor::new(OverWindowExecutorArgs {
66 actor_ctx: params.actor_context,
67
68 input,
69
70 schema: params.info.schema.clone(),
71 calls,
72 partition_key_indices,
73 order_key_indices,
74 order_key_order_types,
75
76 state_table,
77 watermark_epoch: params.watermark_epoch,
78 metrics: params.executor_stats,
79
80 chunk_size: params.env.config().developer.chunk_size,
81 cache_policy: OverWindowCachePolicy::from_protobuf(
82 node.get_cache_policy().unwrap_or_default(),
83 ),
84 });
85 Ok((params.info, exec).into())
86 }
87}