risingwave_frontend/optimizer/plan_node/
stream_global_approx_percentile.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::stream_plan::GlobalApproxPercentileNode;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use super::StreamPlanRef as PlanRef;
23use crate::expr::{ExprRewriter, ExprVisitor, Literal};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::GenericPlanRef;
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::{Distill, TableCatalogBuilder, childless_record};
28use crate::optimizer::plan_node::{
29    ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
30};
31use crate::optimizer::property::{
32    Distribution, FunctionalDependencySet, MonotonicityMap, StreamKind, WatermarkColumns,
33};
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamGlobalApproxPercentile {
38    pub base: PlanBase<Stream>,
39    input: PlanRef,
40    /// Quantile
41    quantile: Literal,
42    /// Used to compute the exponent bucket base.
43    relative_error: Literal,
44}
45
46impl StreamGlobalApproxPercentile {
47    pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
48        assert!(
49            input.stream_kind().is_append_only(),
50            "the input of GlobalApproxPercentile must be the local phase, which is append-only"
51        );
52        let schema = Schema::new(vec![Field::with_name(
53            DataType::Float64,
54            "approx_percentile",
55        )]);
56
57        let base = PlanBase::new_stream(
58            input.ctx(),
59            schema,
60            Some(vec![]),
61            FunctionalDependencySet::new(1),
62            Distribution::Single,
63            StreamKind::Retract,
64            input.emit_on_window_close(),
65            WatermarkColumns::new(),
66            MonotonicityMap::new(),
67        );
68        Self {
69            base,
70            input,
71            quantile: approx_percentile_agg_call.direct_args[0].clone(),
72            relative_error: approx_percentile_agg_call.direct_args[1].clone(),
73        }
74    }
75}
76
77impl Distill for StreamGlobalApproxPercentile {
78    fn distill<'a>(&self) -> XmlNode<'a> {
79        let out = vec![
80            ("quantile", Pretty::debug(&self.quantile)),
81            ("relative_error", Pretty::debug(&self.relative_error)),
82        ];
83        childless_record("StreamGlobalApproxPercentile", out)
84    }
85}
86
87impl PlanTreeNodeUnary<Stream> for StreamGlobalApproxPercentile {
88    fn input(&self) -> PlanRef {
89        self.input.clone()
90    }
91
92    fn clone_with_input(&self, input: PlanRef) -> Self {
93        Self {
94            base: self.base.clone(),
95            input,
96            quantile: self.quantile.clone(),
97            relative_error: self.relative_error.clone(),
98        }
99    }
100}
101
102impl_plan_tree_node_for_unary! { Stream, StreamGlobalApproxPercentile}
103
104impl StreamNode for StreamGlobalApproxPercentile {
105    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
106        let relative_error = self.relative_error.get_data().as_ref().unwrap();
107        let relative_error = relative_error.as_float64().into_inner();
108        let base = (1.0 + relative_error) / (1.0 - relative_error);
109        let quantile = self.quantile.get_data().as_ref().unwrap();
110        let quantile = quantile.as_float64().into_inner();
111
112        // setup table: bucket_id->count
113        let mut bucket_table_builder = TableCatalogBuilder::default();
114        bucket_table_builder.add_column(&Field::with_name(DataType::Int16, "sign"));
115        bucket_table_builder.add_column(&Field::with_name(DataType::Int32, "bucket_id"));
116        bucket_table_builder.add_column(&Field::with_name(DataType::Int64, "count"));
117        bucket_table_builder.add_order_column(0, OrderType::ascending()); // sign
118        bucket_table_builder.add_order_column(1, OrderType::ascending()); // bucket_id
119
120        // setup table: total_count
121        let mut count_table_builder = TableCatalogBuilder::default();
122        count_table_builder.add_column(&Field::with_name(DataType::Int64, "total_count"));
123
124        let body = GlobalApproxPercentileNode {
125            base,
126            quantile,
127            bucket_state_table: Some(
128                bucket_table_builder
129                    .build(vec![], 0)
130                    .with_id(state.gen_table_id_wrapped())
131                    .to_internal_table_prost(),
132            ),
133            count_state_table: Some(
134                count_table_builder
135                    .build(vec![], 0)
136                    .with_id(state.gen_table_id_wrapped())
137                    .to_internal_table_prost(),
138            ),
139        };
140        PbNodeBody::GlobalApproxPercentile(Box::new(body))
141    }
142}
143
144impl ExprRewritable<Stream> for StreamGlobalApproxPercentile {
145    fn has_rewritable_expr(&self) -> bool {
146        false
147    }
148
149    fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
150        unimplemented!()
151    }
152}
153
154impl ExprVisitable for StreamGlobalApproxPercentile {
155    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
156}