risingwave_stream/from_proto/approx_percentile/
global.rsuse risingwave_pb::stream_plan::GlobalApproxPercentileNode;
use crate::common::table::state_table::StateTable;
use crate::executor::GlobalApproxPercentileExecutor;
use crate::from_proto::*;
pub struct GlobalApproxPercentileExecutorBuilder;
impl ExecutorBuilder for GlobalApproxPercentileExecutorBuilder {
type Node = GlobalApproxPercentileNode;
async fn new_boxed_executor(
params: ExecutorParams,
node: &Self::Node,
store: impl StateStore,
) -> StreamResult<Executor> {
let [input]: [_; 1] = params.input.try_into().unwrap();
let bucket_table = node
.bucket_state_table
.as_ref()
.expect("bucket_state_table not provided");
let count_table = node
.count_state_table
.as_ref()
.expect("count_state_table not provided");
let bucket_state_table =
StateTable::from_table_catalog(bucket_table, store.clone(), None).await;
let count_state_table = StateTable::from_table_catalog(count_table, store, None).await;
let exec = GlobalApproxPercentileExecutor::new(
params.actor_context,
input,
node.quantile,
node.base,
params.env.config().developer.chunk_size,
bucket_state_table,
count_state_table,
)
.boxed();
Ok(Executor::new(params.info, exec))
}
}