risingwave_frontend/optimizer/plan_node/
stream_global_approx_percentile.rs1use pretty_xmlish::{Pretty, XmlNode};
15use risingwave_common::catalog::{Field, Schema};
16use risingwave_common::types::DataType;
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::stream_plan::GlobalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use crate::PlanRef;
22use crate::expr::{ExprRewriter, ExprVisitor, Literal};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::GenericPlanRef;
25use crate::optimizer::plan_node::stream::StreamPlanRef;
26use crate::optimizer::plan_node::utils::{Distill, TableCatalogBuilder, childless_record};
27use crate::optimizer::plan_node::{
28 ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
29};
30use crate::optimizer::property::{Distribution, FunctionalDependencySet, WatermarkColumns};
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct StreamGlobalApproxPercentile {
35 pub base: PlanBase<Stream>,
36 input: PlanRef,
37 quantile: Literal,
39 relative_error: Literal,
41}
42
43impl StreamGlobalApproxPercentile {
44 pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
45 let schema = Schema::new(vec![Field::with_name(
46 DataType::Float64,
47 "approx_percentile",
48 )]);
49 let functional_dependency = FunctionalDependencySet::with_key(1, &[]);
50 let watermark_columns = WatermarkColumns::new();
51 let base = PlanBase::new_stream(
52 input.ctx(),
53 schema,
54 Some(vec![]),
55 functional_dependency,
56 Distribution::Single,
57 input.append_only(),
58 input.emit_on_window_close(),
59 watermark_columns,
60 input.columns_monotonicity().clone(),
61 );
62 Self {
63 base,
64 input,
65 quantile: approx_percentile_agg_call.direct_args[0].clone(),
66 relative_error: approx_percentile_agg_call.direct_args[1].clone(),
67 }
68 }
69}
70
71impl Distill for StreamGlobalApproxPercentile {
72 fn distill<'a>(&self) -> XmlNode<'a> {
73 let out = vec![
74 ("quantile", Pretty::debug(&self.quantile)),
75 ("relative_error", Pretty::debug(&self.relative_error)),
76 ];
77 childless_record("StreamGlobalApproxPercentile", out)
78 }
79}
80
81impl PlanTreeNodeUnary for StreamGlobalApproxPercentile {
82 fn input(&self) -> PlanRef {
83 self.input.clone()
84 }
85
86 fn clone_with_input(&self, input: PlanRef) -> Self {
87 Self {
88 base: self.base.clone(),
89 input,
90 quantile: self.quantile.clone(),
91 relative_error: self.relative_error.clone(),
92 }
93 }
94}
95
96impl_plan_tree_node_for_unary! {StreamGlobalApproxPercentile}
97
98impl StreamNode for StreamGlobalApproxPercentile {
99 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
100 let relative_error = self.relative_error.get_data().as_ref().unwrap();
101 let relative_error = relative_error.as_float64().into_inner();
102 let base = (1.0 + relative_error) / (1.0 - relative_error);
103 let quantile = self.quantile.get_data().as_ref().unwrap();
104 let quantile = quantile.as_float64().into_inner();
105
106 let mut bucket_table_builder = TableCatalogBuilder::default();
108 bucket_table_builder.add_column(&Field::with_name(DataType::Int16, "sign"));
109 bucket_table_builder.add_column(&Field::with_name(DataType::Int32, "bucket_id"));
110 bucket_table_builder.add_column(&Field::with_name(DataType::Int64, "count"));
111 bucket_table_builder.add_order_column(0, OrderType::ascending()); bucket_table_builder.add_order_column(1, OrderType::ascending()); let mut count_table_builder = TableCatalogBuilder::default();
116 count_table_builder.add_column(&Field::with_name(DataType::Int64, "total_count"));
117
118 let body = GlobalApproxPercentileNode {
119 base,
120 quantile,
121 bucket_state_table: Some(
122 bucket_table_builder
123 .build(vec![], 0)
124 .with_id(state.gen_table_id_wrapped())
125 .to_internal_table_prost(),
126 ),
127 count_state_table: Some(
128 count_table_builder
129 .build(vec![], 0)
130 .with_id(state.gen_table_id_wrapped())
131 .to_internal_table_prost(),
132 ),
133 };
134 PbNodeBody::GlobalApproxPercentile(Box::new(body))
135 }
136}
137
138impl ExprRewritable for StreamGlobalApproxPercentile {
139 fn has_rewritable_expr(&self) -> bool {
140 false
141 }
142
143 fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
144 unimplemented!()
145 }
146}
147
148impl ExprVisitable for StreamGlobalApproxPercentile {
149 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
150}