risingwave_frontend/optimizer/plan_node/
stream_global_approx_percentile.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::stream_plan::GlobalApproxPercentileNode;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use super::StreamPlanRef as PlanRef;
23use crate::expr::{ExprRewriter, ExprVisitor, Literal};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::GenericPlanRef;
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::{Distill, TableCatalogBuilder, childless_record};
28use crate::optimizer::plan_node::{
29 ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
30};
31use crate::optimizer::property::{
32 Distribution, FunctionalDependencySet, MonotonicityMap, StreamKind, WatermarkColumns,
33};
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamGlobalApproxPercentile {
38 pub base: PlanBase<Stream>,
39 input: PlanRef,
40 quantile: Literal,
42 relative_error: Literal,
44}
45
46impl StreamGlobalApproxPercentile {
47 pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
48 assert!(
49 input.stream_kind().is_append_only(),
50 "the input of GlobalApproxPercentile must be the local phase, which is append-only"
51 );
52 let schema = Schema::new(vec![Field::with_name(
53 DataType::Float64,
54 "approx_percentile",
55 )]);
56
57 let base = PlanBase::new_stream(
58 input.ctx(),
59 schema,
60 Some(vec![]),
61 FunctionalDependencySet::new(1),
62 Distribution::Single,
63 StreamKind::Retract,
64 input.emit_on_window_close(),
65 WatermarkColumns::new(),
66 MonotonicityMap::new(),
67 );
68 Self {
69 base,
70 input,
71 quantile: approx_percentile_agg_call.direct_args[0].clone(),
72 relative_error: approx_percentile_agg_call.direct_args[1].clone(),
73 }
74 }
75}
76
77impl Distill for StreamGlobalApproxPercentile {
78 fn distill<'a>(&self) -> XmlNode<'a> {
79 let out = vec![
80 ("quantile", Pretty::debug(&self.quantile)),
81 ("relative_error", Pretty::debug(&self.relative_error)),
82 ];
83 childless_record("StreamGlobalApproxPercentile", out)
84 }
85}
86
87impl PlanTreeNodeUnary<Stream> for StreamGlobalApproxPercentile {
88 fn input(&self) -> PlanRef {
89 self.input.clone()
90 }
91
92 fn clone_with_input(&self, input: PlanRef) -> Self {
93 Self {
94 base: self.base.clone(),
95 input,
96 quantile: self.quantile.clone(),
97 relative_error: self.relative_error.clone(),
98 }
99 }
100}
101
102impl_plan_tree_node_for_unary! { Stream, StreamGlobalApproxPercentile}
103
104impl StreamNode for StreamGlobalApproxPercentile {
105 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
106 let relative_error = self.relative_error.get_data().as_ref().unwrap();
107 let relative_error = relative_error.as_float64().into_inner();
108 let base = (1.0 + relative_error) / (1.0 - relative_error);
109 let quantile = self.quantile.get_data().as_ref().unwrap();
110 let quantile = quantile.as_float64().into_inner();
111
112 let mut bucket_table_builder = TableCatalogBuilder::default();
114 bucket_table_builder.add_column(&Field::with_name(DataType::Int16, "sign"));
115 bucket_table_builder.add_column(&Field::with_name(DataType::Int32, "bucket_id"));
116 bucket_table_builder.add_column(&Field::with_name(DataType::Int64, "count"));
117 bucket_table_builder.add_order_column(0, OrderType::ascending()); bucket_table_builder.add_order_column(1, OrderType::ascending()); let mut count_table_builder = TableCatalogBuilder::default();
122 count_table_builder.add_column(&Field::with_name(DataType::Int64, "total_count"));
123
124 let body = GlobalApproxPercentileNode {
125 base,
126 quantile,
127 bucket_state_table: Some(
128 bucket_table_builder
129 .build(vec![], 0)
130 .with_id(state.gen_table_id_wrapped())
131 .to_internal_table_prost(),
132 ),
133 count_state_table: Some(
134 count_table_builder
135 .build(vec![], 0)
136 .with_id(state.gen_table_id_wrapped())
137 .to_internal_table_prost(),
138 ),
139 };
140 PbNodeBody::GlobalApproxPercentile(Box::new(body))
141 }
142}
143
144impl ExprRewritable<Stream> for StreamGlobalApproxPercentile {
145 fn has_rewritable_expr(&self) -> bool {
146 false
147 }
148
149 fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
150 unimplemented!()
151 }
152}
153
154impl ExprVisitable for StreamGlobalApproxPercentile {
155 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
156}