risingwave_stream/from_proto/approx_percentile/
global.rs1use risingwave_pb::stream_plan::GlobalApproxPercentileNode;
16
17use crate::common::table::state_table::StateTableBuilder;
18use crate::executor::GlobalApproxPercentileExecutor;
19use crate::from_proto::*;
20
21pub struct GlobalApproxPercentileExecutorBuilder;
22
23impl_stream_node_body!(GlobalApproxPercentile(GlobalApproxPercentileNode) => GlobalApproxPercentileExecutorBuilder);
24
25impl ExecutorBuilder for GlobalApproxPercentileExecutorBuilder {
26 type Node = GlobalApproxPercentileNode;
27
28 async fn new_boxed_executor(
29 params: ExecutorParams,
30 node: &Self::Node,
31 store: impl StateStore,
32 ) -> StreamResult<Executor> {
33 let [input]: [_; 1] = params.input.try_into().unwrap();
34 let bucket_table = node
35 .bucket_state_table
36 .as_ref()
37 .expect("bucket_state_table not provided");
38 let count_table = node
39 .count_state_table
40 .as_ref()
41 .expect("count_state_table not provided");
42 let bucket_state_table = StateTableBuilder::new(bucket_table, store.clone(), None)
43 .enable_preload_all_rows_by_config(¶ms.config)
44 .build()
45 .await;
46 let count_state_table = StateTableBuilder::new(count_table, store, None)
47 .enable_preload_all_rows_by_config(¶ms.config)
48 .build()
49 .await;
50 let exec = GlobalApproxPercentileExecutor::new(
51 params.actor_context,
52 input,
53 node.quantile,
54 node.base,
55 params.config.developer.chunk_size,
56 bucket_state_table,
57 count_state_table,
58 )
59 .boxed();
60 Ok(Executor::new(params.info, exec))
61 }
62}