risingwave_stream/from_proto/
sort.rs1use std::sync::Arc;
16
17use risingwave_pb::stream_plan::SortNode;
18
19use super::*;
20use crate::common::table::state_table::StateTableBuilder;
21use crate::executor::eowc::{SortExecutor, SortExecutorArgs};
22
23pub struct SortExecutorBuilder;
24
25impl ExecutorBuilder for SortExecutorBuilder {
26 type Node = SortNode;
27
28 async fn new_boxed_executor(
29 params: ExecutorParams,
30 node: &Self::Node,
31 store: impl StateStore,
32 ) -> StreamResult<Executor> {
33 let [input]: [_; 1] = params.input.try_into().unwrap();
34 let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for sort"));
35 let state_table = StateTableBuilder::new(node.get_state_table()?, store, Some(vnodes))
36 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
37 .build()
38 .await;
39 let exec = SortExecutor::new(SortExecutorArgs {
40 actor_ctx: params.actor_context,
41 schema: params.info.schema.clone(),
42 input,
43 buffer_table: state_table,
44 chunk_size: params.env.config().developer.chunk_size,
45 sort_column_index: node.sort_column_index as _,
46 });
47 Ok((params.info, exec).into())
48 }
49}