risingwave_frontend/optimizer/plan_node/
stream_global_approx_percentile.rs1use pretty_xmlish::{Pretty, XmlNode};
15use risingwave_common::catalog::{Field, Schema};
16use risingwave_common::types::DataType;
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::stream_plan::GlobalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::StreamPlanRef as PlanRef;
22use crate::expr::{ExprRewriter, ExprVisitor, Literal};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::GenericPlanRef;
25use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
26use crate::optimizer::plan_node::utils::{Distill, TableCatalogBuilder, childless_record};
27use crate::optimizer::plan_node::{
28 ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
29};
30use crate::optimizer::property::{
31 Distribution, FunctionalDependencySet, MonotonicityMap, StreamKind, WatermarkColumns,
32};
33use crate::stream_fragmenter::BuildFragmentGraphState;
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamGlobalApproxPercentile {
37 pub base: PlanBase<Stream>,
38 input: PlanRef,
39 quantile: Literal,
41 relative_error: Literal,
43}
44
45impl StreamGlobalApproxPercentile {
46 pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
47 assert!(
48 input.stream_kind().is_append_only(),
49 "the input of GlobalApproxPercentile must be the local phase, which is append-only"
50 );
51 let schema = Schema::new(vec![Field::with_name(
52 DataType::Float64,
53 "approx_percentile",
54 )]);
55
56 let base = PlanBase::new_stream(
57 input.ctx(),
58 schema,
59 Some(vec![]),
60 FunctionalDependencySet::new(1),
61 Distribution::Single,
62 StreamKind::Retract,
63 input.emit_on_window_close(),
64 WatermarkColumns::new(),
65 MonotonicityMap::new(),
66 );
67 Self {
68 base,
69 input,
70 quantile: approx_percentile_agg_call.direct_args[0].clone(),
71 relative_error: approx_percentile_agg_call.direct_args[1].clone(),
72 }
73 }
74}
75
76impl Distill for StreamGlobalApproxPercentile {
77 fn distill<'a>(&self) -> XmlNode<'a> {
78 let out = vec![
79 ("quantile", Pretty::debug(&self.quantile)),
80 ("relative_error", Pretty::debug(&self.relative_error)),
81 ];
82 childless_record("StreamGlobalApproxPercentile", out)
83 }
84}
85
86impl PlanTreeNodeUnary<Stream> for StreamGlobalApproxPercentile {
87 fn input(&self) -> PlanRef {
88 self.input.clone()
89 }
90
91 fn clone_with_input(&self, input: PlanRef) -> Self {
92 Self {
93 base: self.base.clone(),
94 input,
95 quantile: self.quantile.clone(),
96 relative_error: self.relative_error.clone(),
97 }
98 }
99}
100
101impl_plan_tree_node_for_unary! { Stream, StreamGlobalApproxPercentile}
102
103impl StreamNode for StreamGlobalApproxPercentile {
104 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
105 let relative_error = self.relative_error.get_data().as_ref().unwrap();
106 let relative_error = relative_error.as_float64().into_inner();
107 let base = (1.0 + relative_error) / (1.0 - relative_error);
108 let quantile = self.quantile.get_data().as_ref().unwrap();
109 let quantile = quantile.as_float64().into_inner();
110
111 let mut bucket_table_builder = TableCatalogBuilder::default();
113 bucket_table_builder.add_column(&Field::with_name(DataType::Int16, "sign"));
114 bucket_table_builder.add_column(&Field::with_name(DataType::Int32, "bucket_id"));
115 bucket_table_builder.add_column(&Field::with_name(DataType::Int64, "count"));
116 bucket_table_builder.add_order_column(0, OrderType::ascending()); bucket_table_builder.add_order_column(1, OrderType::ascending()); let mut count_table_builder = TableCatalogBuilder::default();
121 count_table_builder.add_column(&Field::with_name(DataType::Int64, "total_count"));
122
123 let body = GlobalApproxPercentileNode {
124 base,
125 quantile,
126 bucket_state_table: Some(
127 bucket_table_builder
128 .build(vec![], 0)
129 .with_id(state.gen_table_id_wrapped())
130 .to_internal_table_prost(),
131 ),
132 count_state_table: Some(
133 count_table_builder
134 .build(vec![], 0)
135 .with_id(state.gen_table_id_wrapped())
136 .to_internal_table_prost(),
137 ),
138 };
139 PbNodeBody::GlobalApproxPercentile(Box::new(body))
140 }
141}
142
143impl ExprRewritable<Stream> for StreamGlobalApproxPercentile {
144 fn has_rewritable_expr(&self) -> bool {
145 false
146 }
147
148 fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
149 unimplemented!()
150 }
151}
152
153impl ExprVisitable for StreamGlobalApproxPercentile {
154 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
155}