risingwave_frontend/optimizer/plan_node/
stream_global_approx_percentile.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use pretty_xmlish::{Pretty, XmlNode};
15use risingwave_common::catalog::{Field, Schema};
16use risingwave_common::types::DataType;
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::stream_plan::GlobalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::StreamPlanRef as PlanRef;
22use crate::expr::{ExprRewriter, ExprVisitor, Literal};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::GenericPlanRef;
25use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
26use crate::optimizer::plan_node::utils::{Distill, TableCatalogBuilder, childless_record};
27use crate::optimizer::plan_node::{
28    ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
29};
30use crate::optimizer::property::{
31    Distribution, FunctionalDependencySet, MonotonicityMap, StreamKind, WatermarkColumns,
32};
33use crate::stream_fragmenter::BuildFragmentGraphState;
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamGlobalApproxPercentile {
37    pub base: PlanBase<Stream>,
38    input: PlanRef,
39    /// Quantile
40    quantile: Literal,
41    /// Used to compute the exponent bucket base.
42    relative_error: Literal,
43}
44
45impl StreamGlobalApproxPercentile {
46    pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
47        assert!(
48            input.stream_kind().is_append_only(),
49            "the input of GlobalApproxPercentile must be the local phase, which is append-only"
50        );
51        let schema = Schema::new(vec![Field::with_name(
52            DataType::Float64,
53            "approx_percentile",
54        )]);
55
56        let base = PlanBase::new_stream(
57            input.ctx(),
58            schema,
59            Some(vec![]),
60            FunctionalDependencySet::new(1),
61            Distribution::Single,
62            StreamKind::Retract,
63            input.emit_on_window_close(),
64            WatermarkColumns::new(),
65            MonotonicityMap::new(),
66        );
67        Self {
68            base,
69            input,
70            quantile: approx_percentile_agg_call.direct_args[0].clone(),
71            relative_error: approx_percentile_agg_call.direct_args[1].clone(),
72        }
73    }
74}
75
76impl Distill for StreamGlobalApproxPercentile {
77    fn distill<'a>(&self) -> XmlNode<'a> {
78        let out = vec![
79            ("quantile", Pretty::debug(&self.quantile)),
80            ("relative_error", Pretty::debug(&self.relative_error)),
81        ];
82        childless_record("StreamGlobalApproxPercentile", out)
83    }
84}
85
86impl PlanTreeNodeUnary<Stream> for StreamGlobalApproxPercentile {
87    fn input(&self) -> PlanRef {
88        self.input.clone()
89    }
90
91    fn clone_with_input(&self, input: PlanRef) -> Self {
92        Self {
93            base: self.base.clone(),
94            input,
95            quantile: self.quantile.clone(),
96            relative_error: self.relative_error.clone(),
97        }
98    }
99}
100
101impl_plan_tree_node_for_unary! { Stream, StreamGlobalApproxPercentile}
102
103impl StreamNode for StreamGlobalApproxPercentile {
104    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
105        let relative_error = self.relative_error.get_data().as_ref().unwrap();
106        let relative_error = relative_error.as_float64().into_inner();
107        let base = (1.0 + relative_error) / (1.0 - relative_error);
108        let quantile = self.quantile.get_data().as_ref().unwrap();
109        let quantile = quantile.as_float64().into_inner();
110
111        // setup table: bucket_id->count
112        let mut bucket_table_builder = TableCatalogBuilder::default();
113        bucket_table_builder.add_column(&Field::with_name(DataType::Int16, "sign"));
114        bucket_table_builder.add_column(&Field::with_name(DataType::Int32, "bucket_id"));
115        bucket_table_builder.add_column(&Field::with_name(DataType::Int64, "count"));
116        bucket_table_builder.add_order_column(0, OrderType::ascending()); // sign
117        bucket_table_builder.add_order_column(1, OrderType::ascending()); // bucket_id
118
119        // setup table: total_count
120        let mut count_table_builder = TableCatalogBuilder::default();
121        count_table_builder.add_column(&Field::with_name(DataType::Int64, "total_count"));
122
123        let body = GlobalApproxPercentileNode {
124            base,
125            quantile,
126            bucket_state_table: Some(
127                bucket_table_builder
128                    .build(vec![], 0)
129                    .with_id(state.gen_table_id_wrapped())
130                    .to_internal_table_prost(),
131            ),
132            count_state_table: Some(
133                count_table_builder
134                    .build(vec![], 0)
135                    .with_id(state.gen_table_id_wrapped())
136                    .to_internal_table_prost(),
137            ),
138        };
139        PbNodeBody::GlobalApproxPercentile(Box::new(body))
140    }
141}
142
143impl ExprRewritable<Stream> for StreamGlobalApproxPercentile {
144    fn has_rewritable_expr(&self) -> bool {
145        false
146    }
147
148    fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
149        unimplemented!()
150    }
151}
152
153impl ExprVisitable for StreamGlobalApproxPercentile {
154    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
155}