risingwave_stream/executor/approx_percentile/
global.rs1use super::global_state::GlobalApproxPercentileState;
16use crate::executor::prelude::*;
17
18pub struct GlobalApproxPercentileExecutor<S: StateStore> {
19 _ctx: ActorContextRef,
20 pub input: Executor,
21 pub quantile: f64,
22 pub base: f64,
23 pub chunk_size: usize,
24 pub state: GlobalApproxPercentileState<S>,
25}
26
27impl<S: StateStore> GlobalApproxPercentileExecutor<S> {
28 pub fn new(
29 _ctx: ActorContextRef,
30 input: Executor,
31 quantile: f64,
32 base: f64,
33 chunk_size: usize,
34 bucket_state_table: StateTable<S>,
35 count_state_table: StateTable<S>,
36 ) -> Self {
37 let global_state =
38 GlobalApproxPercentileState::new(quantile, base, bucket_state_table, count_state_table);
39 Self {
40 _ctx,
41 input,
42 quantile,
43 base,
44 chunk_size,
45 state: global_state,
46 }
47 }
48
49 #[try_stream(ok = Message, error = StreamExecutorError)]
51 async fn execute_inner(self) {
52 let mut input_stream = self.input.execute();
54 let first_barrier = expect_first_barrier(&mut input_stream).await?;
55 let first_epoch = first_barrier.epoch;
56 yield Message::Barrier(first_barrier);
57 let mut state = self.state;
58 state.init(first_epoch).await?;
59
60 #[for_await]
62 for message in input_stream {
63 match message? {
64 Message::Chunk(chunk) => {
65 state.apply_chunk(chunk)?;
66 }
67 Message::Barrier(barrier) => {
68 let output = state.get_output();
69 yield Message::Chunk(output);
70 state.commit(barrier.epoch).await?;
71 yield Message::Barrier(barrier);
72 }
73 Message::Watermark(_) => {}
74 }
75 }
76 }
77}
78
79impl<S: StateStore> Execute for GlobalApproxPercentileExecutor<S> {
80 fn execute(self: Box<Self>) -> BoxedMessageStream {
81 self.execute_inner().boxed()
82 }
83}