risingwave_stream/from_proto/
over_window.rs1use std::sync::Arc;
16
17use risingwave_common::session_config::OverWindowCachePolicy;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_expr::window_function::WindowFuncCall;
20use risingwave_pb::stream_plan::PbOverWindowNode;
21use risingwave_storage::StateStore;
22
23use super::ExecutorBuilder;
24use crate::common::table::state_table::StateTable;
25use crate::error::StreamResult;
26use crate::executor::{Executor, OverWindowExecutor, OverWindowExecutorArgs};
27use crate::task::ExecutorParams;
28
29pub struct OverWindowExecutorBuilder;
30
31impl ExecutorBuilder for OverWindowExecutorBuilder {
32 type Node = PbOverWindowNode;
33
34 async fn new_boxed_executor(
35 params: ExecutorParams,
36 node: &Self::Node,
37 store: impl StateStore,
38 ) -> StreamResult<Executor> {
39 let [input]: [_; 1] = params.input.try_into().unwrap();
40 let calls: Vec<_> = node
41 .get_calls()
42 .iter()
43 .map(WindowFuncCall::from_protobuf)
44 .try_collect()?;
45 let partition_key_indices = node
46 .get_partition_by()
47 .iter()
48 .map(|i| *i as usize)
49 .collect();
50 let (order_key_indices, order_key_order_types) = node
51 .get_order_by()
52 .iter()
53 .map(ColumnOrder::from_protobuf)
54 .map(|o| (o.column_index, o.order_type))
55 .unzip();
56 let vnodes = Some(Arc::new(
57 params
58 .vnode_bitmap
59 .expect("vnodes not set for EOWC over window"),
60 ));
61 let state_table =
62 StateTable::from_table_catalog(node.get_state_table()?, store, vnodes).await;
63 let exec = OverWindowExecutor::new(OverWindowExecutorArgs {
64 actor_ctx: params.actor_context,
65
66 input,
67
68 schema: params.info.schema.clone(),
69 calls,
70 partition_key_indices,
71 order_key_indices,
72 order_key_order_types,
73
74 state_table,
75 watermark_epoch: params.watermark_epoch,
76 metrics: params.executor_stats,
77
78 chunk_size: params.env.config().developer.chunk_size,
79 cache_policy: OverWindowCachePolicy::from_protobuf(
80 node.get_cache_policy().unwrap_or_default(),
81 ),
82 });
83 Ok((params.info, exec).into())
84 }
85}