risingwave_frontend/optimizer/plan_node/
stream_local_approx_percentile.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_pb::stream_plan::LocalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::StreamPlanRef as PlanRef;
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::{Distill, childless_record, watermark_pretty};
28use crate::optimizer::plan_node::{
29    ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
30};
31use crate::optimizer::property::{FunctionalDependencySet, WatermarkColumns, reject_upsert_input};
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34// Does not contain `core` because no other plan nodes share
35// common fields and schema, even GlobalApproxPercentile.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamLocalApproxPercentile {
38    pub base: PlanBase<Stream>,
39    input: PlanRef,
40    quantile: Literal,
41    relative_error: Literal,
42    percentile_col: InputRef,
43}
44
45impl StreamLocalApproxPercentile {
46    pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Result<Self> {
47        let schema = Schema::new(vec![
48            Field::with_name(DataType::Int16, "sign"),
49            Field::with_name(DataType::Int32, "bucket_id"),
50            Field::with_name(DataType::Int32, "count"),
51        ]);
52        // TODO(kwannoel): derive watermark columns?
53        let watermark_columns = WatermarkColumns::new();
54        let functional_dependency = FunctionalDependencySet::with_key(3, &[]);
55        let base = PlanBase::new_stream(
56            input.ctx(),
57            schema,
58            input.stream_key().map(|k| k.to_vec()),
59            functional_dependency,
60            input.distribution().clone(),
61            reject_upsert_input!(input),
62            input.emit_on_window_close(),
63            watermark_columns,
64            input.columns_monotonicity().clone(),
65        );
66        Ok(Self {
67            base,
68            input,
69            quantile: approx_percentile_agg_call.direct_args[0].clone(),
70            relative_error: approx_percentile_agg_call.direct_args[1].clone(),
71            percentile_col: approx_percentile_agg_call.inputs[0].clone(),
72        })
73    }
74}
75
76impl Distill for StreamLocalApproxPercentile {
77    fn distill<'a>(&self) -> XmlNode<'a> {
78        let mut out = Vec::with_capacity(5);
79        out.push((
80            "percentile_col",
81            Pretty::display(&InputRefDisplay {
82                input_ref: &self.percentile_col,
83                input_schema: self.input.schema(),
84            }),
85        ));
86        out.push(("quantile", Pretty::debug(&self.quantile)));
87        out.push(("relative_error", Pretty::debug(&self.relative_error)));
88        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
89            out.push(("output_watermarks", ow));
90        }
91        childless_record("StreamLocalApproxPercentile", out)
92    }
93}
94
95impl PlanTreeNodeUnary<Stream> for StreamLocalApproxPercentile {
96    fn input(&self) -> PlanRef {
97        self.input.clone()
98    }
99
100    fn clone_with_input(&self, input: PlanRef) -> Self {
101        Self {
102            base: self.base.clone(),
103            input,
104            quantile: self.quantile.clone(),
105            relative_error: self.relative_error.clone(),
106            percentile_col: self.percentile_col.clone(),
107        }
108    }
109}
110
111impl_plan_tree_node_for_unary! { Stream, StreamLocalApproxPercentile}
112
113impl StreamNode for StreamLocalApproxPercentile {
114    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
115        let relative_error = self.relative_error.get_data().as_ref().unwrap();
116        let relative_error = relative_error.as_float64().into_inner();
117        let base = (1.0 + relative_error) / (1.0 - relative_error);
118        let percentile_index = self.percentile_col.index() as u32;
119        let body = LocalApproxPercentileNode {
120            base,
121            percentile_index,
122        };
123        PbNodeBody::LocalApproxPercentile(Box::new(body))
124    }
125}
126
127impl ExprRewritable<Stream> for StreamLocalApproxPercentile {
128    fn has_rewritable_expr(&self) -> bool {
129        false
130    }
131
132    fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
133        unimplemented!()
134    }
135}
136
137impl ExprVisitable for StreamLocalApproxPercentile {
138    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
139}