risingwave_frontend/optimizer/plan_node/
stream_local_approx_percentile.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_pb::stream_plan::LocalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use crate::PlanRef;
22use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
25use crate::optimizer::plan_node::stream::StreamPlanRef;
26use crate::optimizer::plan_node::utils::{Distill, childless_record, watermark_pretty};
27use crate::optimizer::plan_node::{
28    ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
29};
30use crate::optimizer::property::{FunctionalDependencySet, WatermarkColumns};
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33// Does not contain `core` because no other plan nodes share
34// common fields and schema, even GlobalApproxPercentile.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamLocalApproxPercentile {
37    pub base: PlanBase<Stream>,
38    input: PlanRef,
39    quantile: Literal,
40    relative_error: Literal,
41    percentile_col: InputRef,
42}
43
44impl StreamLocalApproxPercentile {
45    pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
46        let schema = Schema::new(vec![
47            Field::with_name(DataType::Int16, "sign"),
48            Field::with_name(DataType::Int32, "bucket_id"),
49            Field::with_name(DataType::Int32, "count"),
50        ]);
51        // TODO(kwannoel): derive watermark columns?
52        let watermark_columns = WatermarkColumns::new();
53        let functional_dependency = FunctionalDependencySet::with_key(3, &[]);
54        let base = PlanBase::new_stream(
55            input.ctx(),
56            schema,
57            input.stream_key().map(|k| k.to_vec()),
58            functional_dependency,
59            input.distribution().clone(),
60            input.append_only(),
61            input.emit_on_window_close(),
62            watermark_columns,
63            input.columns_monotonicity().clone(),
64        );
65        Self {
66            base,
67            input,
68            quantile: approx_percentile_agg_call.direct_args[0].clone(),
69            relative_error: approx_percentile_agg_call.direct_args[1].clone(),
70            percentile_col: approx_percentile_agg_call.inputs[0].clone(),
71        }
72    }
73}
74
75impl Distill for StreamLocalApproxPercentile {
76    fn distill<'a>(&self) -> XmlNode<'a> {
77        let mut out = Vec::with_capacity(5);
78        out.push((
79            "percentile_col",
80            Pretty::display(&InputRefDisplay {
81                input_ref: &self.percentile_col,
82                input_schema: self.input.schema(),
83            }),
84        ));
85        out.push(("quantile", Pretty::debug(&self.quantile)));
86        out.push(("relative_error", Pretty::debug(&self.relative_error)));
87        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
88            out.push(("output_watermarks", ow));
89        }
90        childless_record("StreamLocalApproxPercentile", out)
91    }
92}
93
94impl PlanTreeNodeUnary for StreamLocalApproxPercentile {
95    fn input(&self) -> PlanRef {
96        self.input.clone()
97    }
98
99    fn clone_with_input(&self, input: PlanRef) -> Self {
100        Self {
101            base: self.base.clone(),
102            input,
103            quantile: self.quantile.clone(),
104            relative_error: self.relative_error.clone(),
105            percentile_col: self.percentile_col.clone(),
106        }
107    }
108}
109
110impl_plan_tree_node_for_unary! {StreamLocalApproxPercentile}
111
112impl StreamNode for StreamLocalApproxPercentile {
113    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
114        let relative_error = self.relative_error.get_data().as_ref().unwrap();
115        let relative_error = relative_error.as_float64().into_inner();
116        let base = (1.0 + relative_error) / (1.0 - relative_error);
117        let percentile_index = self.percentile_col.index() as u32;
118        let body = LocalApproxPercentileNode {
119            base,
120            percentile_index,
121        };
122        PbNodeBody::LocalApproxPercentile(Box::new(body))
123    }
124}
125
126impl ExprRewritable for StreamLocalApproxPercentile {
127    fn has_rewritable_expr(&self) -> bool {
128        false
129    }
130
131    fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
132        unimplemented!()
133    }
134}
135
136impl ExprVisitable for StreamLocalApproxPercentile {
137    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
138}