risingwave_stream/from_proto/
sort.rs1use std::sync::Arc;
16
17use risingwave_pb::stream_plan::SortNode;
18
19use super::*;
20use crate::common::table::state_table::StateTable;
21use crate::executor::eowc::{SortExecutor, SortExecutorArgs};
22
23pub struct SortExecutorBuilder;
24
25impl ExecutorBuilder for SortExecutorBuilder {
26 type Node = SortNode;
27
28 async fn new_boxed_executor(
29 params: ExecutorParams,
30 node: &Self::Node,
31 store: impl StateStore,
32 ) -> StreamResult<Executor> {
33 let [input]: [_; 1] = params.input.try_into().unwrap();
34 let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for sort"));
35 let state_table =
36 StateTable::from_table_catalog(node.get_state_table()?, store, Some(vnodes)).await;
37 let exec = SortExecutor::new(SortExecutorArgs {
38 actor_ctx: params.actor_context,
39 schema: params.info.schema.clone(),
40 input,
41 buffer_table: state_table,
42 chunk_size: params.env.config().developer.chunk_size,
43 sort_column_index: node.sort_column_index as _,
44 });
45 Ok((params.info, exec).into())
46 }
47}