risingwave_stream/from_proto/
group_top_n.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::Schema;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::ColumnOrder;
21use risingwave_pb::stream_plan::GroupTopNNode;
22
23use super::*;
24use crate::common::table::state_table::{StateTable, StateTableBuilder};
25use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor};
26use crate::task::AtomicU64Ref;
27
28pub struct GroupTopNExecutorBuilder<const APPEND_ONLY: bool>;
29
30impl_stream_node_body!(GroupTopN(GroupTopNNode) => GroupTopNExecutorBuilder<false>);
31impl_stream_node_body!(AppendOnlyGroupTopN(GroupTopNNode) => GroupTopNExecutorBuilder<true>);
32
33impl<const APPEND_ONLY: bool> ExecutorBuilder for GroupTopNExecutorBuilder<APPEND_ONLY> {
34 type Node = GroupTopNNode;
35
36 async fn new_boxed_executor(
37 params: ExecutorParams,
38 node: &Self::Node,
39 store: impl StateStore,
40 ) -> StreamResult<Executor> {
41 let group_by: Vec<usize> = node
42 .get_group_key()
43 .iter()
44 .map(|idx| *idx as usize)
45 .collect();
46 let table = node.get_table()?;
47 let vnodes = params.vnode_bitmap.map(Arc::new);
48 let state_table = StateTableBuilder::new(table, store, vnodes)
49 .enable_preload_all_rows_by_config(¶ms.config)
50 .build()
51 .await;
52 let storage_key = table
53 .get_pk()
54 .iter()
55 .map(ColumnOrder::from_protobuf)
56 .collect();
57 let [input]: [_; 1] = params.input.try_into().unwrap();
58 let group_key_types = group_by
59 .iter()
60 .map(|i| input.schema()[*i].data_type())
61 .collect();
62 let order_by = node
63 .order_by
64 .iter()
65 .map(ColumnOrder::from_protobuf)
66 .collect();
67
68 let args = GroupTopNExecutorDispatcherArgs {
69 input,
70 ctx: params.actor_context,
71 schema: params.info.schema.clone(),
72 storage_key,
73 offset_and_limit: (node.offset as usize, node.limit as usize),
74 order_by,
75 group_by,
76 state_table,
77 watermark_epoch: params.watermark_epoch,
78 group_key_types,
79
80 with_ties: node.with_ties,
81 append_only: APPEND_ONLY,
82 };
83 Ok((params.info, args.dispatch()?).into())
84 }
85}
86
87struct GroupTopNExecutorDispatcherArgs<S: StateStore> {
88 input: Executor,
89 ctx: ActorContextRef,
90 schema: Schema,
91 storage_key: Vec<ColumnOrder>,
92 offset_and_limit: (usize, usize),
93 order_by: Vec<ColumnOrder>,
94 group_by: Vec<usize>,
95 state_table: StateTable<S>,
96 watermark_epoch: AtomicU64Ref,
97 group_key_types: Vec<DataType>,
98
99 with_ties: bool,
100 append_only: bool,
101}
102
103impl<S: StateStore> HashKeyDispatcher for GroupTopNExecutorDispatcherArgs<S> {
104 type Output = StreamResult<Box<dyn Execute>>;
105
106 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
107 macro_rules! build {
108 ($executor:ident, $with_ties:literal) => {
109 Ok($executor::<K, S, $with_ties>::new(
110 self.input,
111 self.ctx,
112 self.schema,
113 self.storage_key,
114 self.offset_and_limit,
115 self.order_by,
116 self.group_by,
117 self.state_table,
118 self.watermark_epoch,
119 )?
120 .boxed())
121 };
122 }
123 match (self.append_only, self.with_ties) {
124 (true, true) => build!(AppendOnlyGroupTopNExecutor, true),
125 (true, false) => build!(AppendOnlyGroupTopNExecutor, false),
126 (false, true) => build!(GroupTopNExecutor, true),
127 (false, false) => build!(GroupTopNExecutor, false),
128 }
129 }
130
131 fn data_types(&self) -> &[DataType] {
132 &self.group_key_types
133 }
134}