risingwave_stream/from_proto/
sort.rs1use std::sync::Arc;
16
17use risingwave_pb::stream_plan::SortNode;
18
19use super::*;
20use crate::common::table::state_table::StateTableBuilder;
21use crate::executor::eowc::{SortExecutor, SortExecutorArgs};
22
23pub struct SortExecutorBuilder;
24
25impl_stream_node_body!(Sort(SortNode) => SortExecutorBuilder);
26
27impl ExecutorBuilder for SortExecutorBuilder {
28 type Node = SortNode;
29
30 async fn new_boxed_executor(
31 params: ExecutorParams,
32 node: &Self::Node,
33 store: impl StateStore,
34 ) -> StreamResult<Executor> {
35 let [input]: [_; 1] = params.input.try_into().unwrap();
36 let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for sort"));
37 let state_table = StateTableBuilder::new(node.get_state_table()?, store, Some(vnodes))
38 .enable_preload_all_rows_by_config(¶ms.config)
39 .build()
40 .await;
41 let exec = SortExecutor::new(SortExecutorArgs {
42 actor_ctx: params.actor_context,
43 schema: params.info.schema.clone(),
44 input,
45 buffer_table: state_table,
46 chunk_size: params.config.developer.chunk_size,
47 sort_column_index: node.sort_column_index as _,
48 });
49 Ok((params.info, exec).into())
50 }
51}