risingwave_stream/from_proto/approx_percentile/
global.rs1use risingwave_pb::stream_plan::GlobalApproxPercentileNode;
16
17use crate::common::table::state_table::StateTableBuilder;
18use crate::executor::GlobalApproxPercentileExecutor;
19use crate::from_proto::*;
20
21pub struct GlobalApproxPercentileExecutorBuilder;
22
23impl ExecutorBuilder for GlobalApproxPercentileExecutorBuilder {
24 type Node = GlobalApproxPercentileNode;
25
26 async fn new_boxed_executor(
27 params: ExecutorParams,
28 node: &Self::Node,
29 store: impl StateStore,
30 ) -> StreamResult<Executor> {
31 let [input]: [_; 1] = params.input.try_into().unwrap();
32 let bucket_table = node
33 .bucket_state_table
34 .as_ref()
35 .expect("bucket_state_table not provided");
36 let count_table = node
37 .count_state_table
38 .as_ref()
39 .expect("count_state_table not provided");
40 let bucket_state_table = StateTableBuilder::new(bucket_table, store.clone(), None)
41 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
42 .build()
43 .await;
44 let count_state_table = StateTableBuilder::new(count_table, store, None)
45 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
46 .build()
47 .await;
48 let exec = GlobalApproxPercentileExecutor::new(
49 params.actor_context,
50 input,
51 node.quantile,
52 node.base,
53 params.env.config().developer.chunk_size,
54 bucket_state_table,
55 count_state_table,
56 )
57 .boxed();
58 Ok(Executor::new(params.info, exec))
59 }
60}