risingwave_frontend/optimizer/plan_node/
stream_global_approx_percentile.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use pretty_xmlish::{Pretty, XmlNode};
15use risingwave_common::catalog::{Field, Schema};
16use risingwave_common::types::DataType;
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::stream_plan::GlobalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use crate::PlanRef;
22use crate::expr::{ExprRewriter, ExprVisitor, Literal};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::GenericPlanRef;
25use crate::optimizer::plan_node::stream::StreamPlanRef;
26use crate::optimizer::plan_node::utils::{Distill, TableCatalogBuilder, childless_record};
27use crate::optimizer::plan_node::{
28    ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
29};
30use crate::optimizer::property::{Distribution, FunctionalDependencySet, WatermarkColumns};
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct StreamGlobalApproxPercentile {
35    pub base: PlanBase<Stream>,
36    input: PlanRef,
37    /// Quantile
38    quantile: Literal,
39    /// Used to compute the exponent bucket base.
40    relative_error: Literal,
41}
42
43impl StreamGlobalApproxPercentile {
44    pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
45        let schema = Schema::new(vec![Field::with_name(
46            DataType::Float64,
47            "approx_percentile",
48        )]);
49        let functional_dependency = FunctionalDependencySet::with_key(1, &[]);
50        let watermark_columns = WatermarkColumns::new();
51        let base = PlanBase::new_stream(
52            input.ctx(),
53            schema,
54            Some(vec![]),
55            functional_dependency,
56            Distribution::Single,
57            input.append_only(),
58            input.emit_on_window_close(),
59            watermark_columns,
60            input.columns_monotonicity().clone(),
61        );
62        Self {
63            base,
64            input,
65            quantile: approx_percentile_agg_call.direct_args[0].clone(),
66            relative_error: approx_percentile_agg_call.direct_args[1].clone(),
67        }
68    }
69}
70
71impl Distill for StreamGlobalApproxPercentile {
72    fn distill<'a>(&self) -> XmlNode<'a> {
73        let out = vec![
74            ("quantile", Pretty::debug(&self.quantile)),
75            ("relative_error", Pretty::debug(&self.relative_error)),
76        ];
77        childless_record("StreamGlobalApproxPercentile", out)
78    }
79}
80
81impl PlanTreeNodeUnary for StreamGlobalApproxPercentile {
82    fn input(&self) -> PlanRef {
83        self.input.clone()
84    }
85
86    fn clone_with_input(&self, input: PlanRef) -> Self {
87        Self {
88            base: self.base.clone(),
89            input,
90            quantile: self.quantile.clone(),
91            relative_error: self.relative_error.clone(),
92        }
93    }
94}
95
96impl_plan_tree_node_for_unary! {StreamGlobalApproxPercentile}
97
98impl StreamNode for StreamGlobalApproxPercentile {
99    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
100        let relative_error = self.relative_error.get_data().as_ref().unwrap();
101        let relative_error = relative_error.as_float64().into_inner();
102        let base = (1.0 + relative_error) / (1.0 - relative_error);
103        let quantile = self.quantile.get_data().as_ref().unwrap();
104        let quantile = quantile.as_float64().into_inner();
105
106        // setup table: bucket_id->count
107        let mut bucket_table_builder = TableCatalogBuilder::default();
108        bucket_table_builder.add_column(&Field::with_name(DataType::Int16, "sign"));
109        bucket_table_builder.add_column(&Field::with_name(DataType::Int32, "bucket_id"));
110        bucket_table_builder.add_column(&Field::with_name(DataType::Int64, "count"));
111        bucket_table_builder.add_order_column(0, OrderType::ascending()); // sign
112        bucket_table_builder.add_order_column(1, OrderType::ascending()); // bucket_id
113
114        // setup table: total_count
115        let mut count_table_builder = TableCatalogBuilder::default();
116        count_table_builder.add_column(&Field::with_name(DataType::Int64, "total_count"));
117
118        let body = GlobalApproxPercentileNode {
119            base,
120            quantile,
121            bucket_state_table: Some(
122                bucket_table_builder
123                    .build(vec![], 0)
124                    .with_id(state.gen_table_id_wrapped())
125                    .to_internal_table_prost(),
126            ),
127            count_state_table: Some(
128                count_table_builder
129                    .build(vec![], 0)
130                    .with_id(state.gen_table_id_wrapped())
131                    .to_internal_table_prost(),
132            ),
133        };
134        PbNodeBody::GlobalApproxPercentile(Box::new(body))
135    }
136}
137
138impl ExprRewritable for StreamGlobalApproxPercentile {
139    fn has_rewritable_expr(&self) -> bool {
140        false
141    }
142
143    fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
144        unimplemented!()
145    }
146}
147
148impl ExprVisitable for StreamGlobalApproxPercentile {
149    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
150}