risingwave_stream/from_proto/
group_top_n.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::Schema;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::ColumnOrder;
21use risingwave_pb::stream_plan::GroupTopNNode;
22
23use super::*;
24use crate::common::table::state_table::StateTable;
25use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor};
26use crate::task::AtomicU64Ref;
27
28pub struct GroupTopNExecutorBuilder<const APPEND_ONLY: bool>;
29
30impl<const APPEND_ONLY: bool> ExecutorBuilder for GroupTopNExecutorBuilder<APPEND_ONLY> {
31 type Node = GroupTopNNode;
32
33 async fn new_boxed_executor(
34 params: ExecutorParams,
35 node: &Self::Node,
36 store: impl StateStore,
37 ) -> StreamResult<Executor> {
38 let group_by: Vec<usize> = node
39 .get_group_key()
40 .iter()
41 .map(|idx| *idx as usize)
42 .collect();
43 let table = node.get_table()?;
44 let vnodes = params.vnode_bitmap.map(Arc::new);
45 let state_table = StateTable::from_table_catalog(table, store, vnodes).await;
46 let storage_key = table
47 .get_pk()
48 .iter()
49 .map(ColumnOrder::from_protobuf)
50 .collect();
51 let [input]: [_; 1] = params.input.try_into().unwrap();
52 let group_key_types = group_by
53 .iter()
54 .map(|i| input.schema()[*i].data_type())
55 .collect();
56 let order_by = node
57 .order_by
58 .iter()
59 .map(ColumnOrder::from_protobuf)
60 .collect();
61
62 let args = GroupTopNExecutorDispatcherArgs {
63 input,
64 ctx: params.actor_context,
65 schema: params.info.schema.clone(),
66 storage_key,
67 offset_and_limit: (node.offset as usize, node.limit as usize),
68 order_by,
69 group_by,
70 state_table,
71 watermark_epoch: params.watermark_epoch,
72 group_key_types,
73
74 with_ties: node.with_ties,
75 append_only: APPEND_ONLY,
76 };
77 Ok((params.info, args.dispatch()?).into())
78 }
79}
80
81struct GroupTopNExecutorDispatcherArgs<S: StateStore> {
82 input: Executor,
83 ctx: ActorContextRef,
84 schema: Schema,
85 storage_key: Vec<ColumnOrder>,
86 offset_and_limit: (usize, usize),
87 order_by: Vec<ColumnOrder>,
88 group_by: Vec<usize>,
89 state_table: StateTable<S>,
90 watermark_epoch: AtomicU64Ref,
91 group_key_types: Vec<DataType>,
92
93 with_ties: bool,
94 append_only: bool,
95}
96
97impl<S: StateStore> HashKeyDispatcher for GroupTopNExecutorDispatcherArgs<S> {
98 type Output = StreamResult<Box<dyn Execute>>;
99
100 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
101 macro_rules! build {
102 ($excutor:ident, $with_ties:literal) => {
103 Ok($excutor::<K, S, $with_ties>::new(
104 self.input,
105 self.ctx,
106 self.schema,
107 self.storage_key,
108 self.offset_and_limit,
109 self.order_by,
110 self.group_by,
111 self.state_table,
112 self.watermark_epoch,
113 )?
114 .boxed())
115 };
116 }
117 match (self.append_only, self.with_ties) {
118 (true, true) => build!(AppendOnlyGroupTopNExecutor, true),
119 (true, false) => build!(AppendOnlyGroupTopNExecutor, false),
120 (false, true) => build!(GroupTopNExecutor, true),
121 (false, false) => build!(GroupTopNExecutor, false),
122 }
123 }
124
125 fn data_types(&self) -> &[DataType] {
126 &self.group_key_types
127 }
128}