risingwave_frontend/optimizer/plan_node/
stream_local_approx_percentile.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_pb::stream_plan::LocalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::StreamPlanRef as PlanRef;
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::GenericPlanRef;
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::{Distill, childless_record, watermark_pretty};
28use crate::optimizer::plan_node::{
29    ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
30};
31use crate::optimizer::property::{
32    Distribution, FunctionalDependencySet, MonotonicityMap, StreamKind, WatermarkColumns,
33    reject_upsert_input,
34};
35use crate::stream_fragmenter::BuildFragmentGraphState;
36
37// Does not contain `core` because no other plan nodes share
38// common fields and schema, even GlobalApproxPercentile.
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct StreamLocalApproxPercentile {
41    pub base: PlanBase<Stream>,
42    input: PlanRef,
43    quantile: Literal,
44    relative_error: Literal,
45    percentile_col: InputRef,
46}
47
48impl StreamLocalApproxPercentile {
49    pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Result<Self> {
50        reject_upsert_input!(input);
51
52        let schema = Schema::new(vec![
53            Field::with_name(DataType::Int16, "sign"),
54            Field::with_name(DataType::Int32, "bucket_id"),
55            Field::with_name(DataType::Int32, "count"),
56        ]);
57        let base = PlanBase::new_stream(
58            input.ctx(),
59            schema,
60            Some(vec![]),
61            FunctionalDependencySet::new(3),
62            Distribution::SomeShard,
63            // Local phase outputs the delta as `Insert` rows to the global phase.
64            StreamKind::AppendOnly,
65            input.emit_on_window_close(),
66            WatermarkColumns::new(),
67            MonotonicityMap::new(),
68        );
69        Ok(Self {
70            base,
71            input,
72            quantile: approx_percentile_agg_call.direct_args[0].clone(),
73            relative_error: approx_percentile_agg_call.direct_args[1].clone(),
74            percentile_col: approx_percentile_agg_call.inputs[0].clone(),
75        })
76    }
77}
78
79impl Distill for StreamLocalApproxPercentile {
80    fn distill<'a>(&self) -> XmlNode<'a> {
81        let mut out = Vec::with_capacity(5);
82        out.push((
83            "percentile_col",
84            Pretty::display(&InputRefDisplay {
85                input_ref: &self.percentile_col,
86                input_schema: self.input.schema(),
87            }),
88        ));
89        out.push(("quantile", Pretty::debug(&self.quantile)));
90        out.push(("relative_error", Pretty::debug(&self.relative_error)));
91        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
92            out.push(("output_watermarks", ow));
93        }
94        childless_record("StreamLocalApproxPercentile", out)
95    }
96}
97
98impl PlanTreeNodeUnary<Stream> for StreamLocalApproxPercentile {
99    fn input(&self) -> PlanRef {
100        self.input.clone()
101    }
102
103    fn clone_with_input(&self, input: PlanRef) -> Self {
104        Self {
105            base: self.base.clone(),
106            input,
107            quantile: self.quantile.clone(),
108            relative_error: self.relative_error.clone(),
109            percentile_col: self.percentile_col.clone(),
110        }
111    }
112}
113
114impl_plan_tree_node_for_unary! { Stream, StreamLocalApproxPercentile}
115
116impl StreamNode for StreamLocalApproxPercentile {
117    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
118        let relative_error = self.relative_error.get_data().as_ref().unwrap();
119        let relative_error = relative_error.as_float64().into_inner();
120        let base = (1.0 + relative_error) / (1.0 - relative_error);
121        let percentile_index = self.percentile_col.index() as u32;
122        let body = LocalApproxPercentileNode {
123            base,
124            percentile_index,
125        };
126        PbNodeBody::LocalApproxPercentile(Box::new(body))
127    }
128}
129
130impl ExprRewritable<Stream> for StreamLocalApproxPercentile {
131    fn has_rewritable_expr(&self) -> bool {
132        false
133    }
134
135    fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
136        unimplemented!()
137    }
138}
139
140impl ExprVisitable for StreamLocalApproxPercentile {
141    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
142}