risingwave_frontend/optimizer/plan_node/
stream_local_approx_percentile.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_pb::stream_plan::LocalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::StreamPlanRef as PlanRef;
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::GenericPlanRef;
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::{Distill, childless_record, watermark_pretty};
28use crate::optimizer::plan_node::{
29 ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
30};
31use crate::optimizer::property::{
32 Distribution, FunctionalDependencySet, MonotonicityMap, StreamKind, WatermarkColumns,
33 reject_upsert_input,
34};
35use crate::stream_fragmenter::BuildFragmentGraphState;
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct StreamLocalApproxPercentile {
41 pub base: PlanBase<Stream>,
42 input: PlanRef,
43 quantile: Literal,
44 relative_error: Literal,
45 percentile_col: InputRef,
46}
47
48impl StreamLocalApproxPercentile {
49 pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Result<Self> {
50 reject_upsert_input!(input);
51
52 let schema = Schema::new(vec![
53 Field::with_name(DataType::Int16, "sign"),
54 Field::with_name(DataType::Int32, "bucket_id"),
55 Field::with_name(DataType::Int32, "count"),
56 ]);
57 let base = PlanBase::new_stream(
58 input.ctx(),
59 schema,
60 Some(vec![]),
61 FunctionalDependencySet::new(3),
62 Distribution::SomeShard,
63 StreamKind::AppendOnly,
65 input.emit_on_window_close(),
66 WatermarkColumns::new(),
67 MonotonicityMap::new(),
68 );
69 Ok(Self {
70 base,
71 input,
72 quantile: approx_percentile_agg_call.direct_args[0].clone(),
73 relative_error: approx_percentile_agg_call.direct_args[1].clone(),
74 percentile_col: approx_percentile_agg_call.inputs[0].clone(),
75 })
76 }
77}
78
79impl Distill for StreamLocalApproxPercentile {
80 fn distill<'a>(&self) -> XmlNode<'a> {
81 let mut out = Vec::with_capacity(5);
82 out.push((
83 "percentile_col",
84 Pretty::display(&InputRefDisplay {
85 input_ref: &self.percentile_col,
86 input_schema: self.input.schema(),
87 }),
88 ));
89 out.push(("quantile", Pretty::debug(&self.quantile)));
90 out.push(("relative_error", Pretty::debug(&self.relative_error)));
91 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
92 out.push(("output_watermarks", ow));
93 }
94 childless_record("StreamLocalApproxPercentile", out)
95 }
96}
97
98impl PlanTreeNodeUnary<Stream> for StreamLocalApproxPercentile {
99 fn input(&self) -> PlanRef {
100 self.input.clone()
101 }
102
103 fn clone_with_input(&self, input: PlanRef) -> Self {
104 Self {
105 base: self.base.clone(),
106 input,
107 quantile: self.quantile.clone(),
108 relative_error: self.relative_error.clone(),
109 percentile_col: self.percentile_col.clone(),
110 }
111 }
112}
113
114impl_plan_tree_node_for_unary! { Stream, StreamLocalApproxPercentile}
115
116impl StreamNode for StreamLocalApproxPercentile {
117 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
118 let relative_error = self.relative_error.get_data().as_ref().unwrap();
119 let relative_error = relative_error.as_float64().into_inner();
120 let base = (1.0 + relative_error) / (1.0 - relative_error);
121 let percentile_index = self.percentile_col.index() as u32;
122 let body = LocalApproxPercentileNode {
123 base,
124 percentile_index,
125 };
126 PbNodeBody::LocalApproxPercentile(Box::new(body))
127 }
128}
129
130impl ExprRewritable<Stream> for StreamLocalApproxPercentile {
131 fn has_rewritable_expr(&self) -> bool {
132 false
133 }
134
135 fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
136 unimplemented!()
137 }
138}
139
140impl ExprVisitable for StreamLocalApproxPercentile {
141 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
142}