risingwave_stream/from_proto/
over_window.rsuse std::sync::Arc;
use risingwave_common::session_config::OverWindowCachePolicy;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_expr::window_function::WindowFuncCall;
use risingwave_pb::stream_plan::PbOverWindowNode;
use risingwave_storage::StateStore;
use super::ExecutorBuilder;
use crate::common::table::state_table::StateTable;
use crate::error::StreamResult;
use crate::executor::{Executor, OverWindowExecutor, OverWindowExecutorArgs};
use crate::task::ExecutorParams;
pub struct OverWindowExecutorBuilder;
impl ExecutorBuilder for OverWindowExecutorBuilder {
type Node = PbOverWindowNode;
async fn new_boxed_executor(
params: ExecutorParams,
node: &Self::Node,
store: impl StateStore,
) -> StreamResult<Executor> {
let [input]: [_; 1] = params.input.try_into().unwrap();
let calls: Vec<_> = node
.get_calls()
.iter()
.map(WindowFuncCall::from_protobuf)
.try_collect()?;
let partition_key_indices = node
.get_partition_by()
.iter()
.map(|i| *i as usize)
.collect();
let (order_key_indices, order_key_order_types) = node
.get_order_by()
.iter()
.map(ColumnOrder::from_protobuf)
.map(|o| (o.column_index, o.order_type))
.unzip();
let vnodes = Some(Arc::new(
params
.vnode_bitmap
.expect("vnodes not set for EOWC over window"),
));
let state_table =
StateTable::from_table_catalog(node.get_state_table()?, store, vnodes).await;
let exec = OverWindowExecutor::new(OverWindowExecutorArgs {
actor_ctx: params.actor_context,
input,
schema: params.info.schema.clone(),
calls,
partition_key_indices,
order_key_indices,
order_key_order_types,
state_table,
watermark_epoch: params.watermark_epoch,
metrics: params.executor_stats,
chunk_size: params.env.config().developer.chunk_size,
cache_policy: OverWindowCachePolicy::from_protobuf(
node.get_cache_policy().unwrap_or_default(),
),
});
Ok((params.info, exec).into())
}
}