risingwave_stream/from_proto/approx_percentile/
global.rs1use risingwave_pb::stream_plan::GlobalApproxPercentileNode;
16
17use crate::common::table::state_table::StateTable;
18use crate::executor::GlobalApproxPercentileExecutor;
19use crate::from_proto::*;
20
21pub struct GlobalApproxPercentileExecutorBuilder;
22
23impl ExecutorBuilder for GlobalApproxPercentileExecutorBuilder {
24 type Node = GlobalApproxPercentileNode;
25
26 async fn new_boxed_executor(
27 params: ExecutorParams,
28 node: &Self::Node,
29 store: impl StateStore,
30 ) -> StreamResult<Executor> {
31 let [input]: [_; 1] = params.input.try_into().unwrap();
32 let bucket_table = node
33 .bucket_state_table
34 .as_ref()
35 .expect("bucket_state_table not provided");
36 let count_table = node
37 .count_state_table
38 .as_ref()
39 .expect("count_state_table not provided");
40 let bucket_state_table =
41 StateTable::from_table_catalog(bucket_table, store.clone(), None).await;
42 let count_state_table = StateTable::from_table_catalog(count_table, store, None).await;
43 let exec = GlobalApproxPercentileExecutor::new(
44 params.actor_context,
45 input,
46 node.quantile,
47 node.base,
48 params.env.config().developer.chunk_size,
49 bucket_state_table,
50 count_state_table,
51 )
52 .boxed();
53 Ok(Executor::new(params.info, exec))
54 }
55}