risingwave_frontend/optimizer/plan_node/
stream_local_approx_percentile.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_pb::stream_plan::LocalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::StreamPlanRef as PlanRef;
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::{Distill, childless_record, watermark_pretty};
28use crate::optimizer::plan_node::{
29 ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
30};
31use crate::optimizer::property::{FunctionalDependencySet, WatermarkColumns, reject_upsert_input};
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamLocalApproxPercentile {
38 pub base: PlanBase<Stream>,
39 input: PlanRef,
40 quantile: Literal,
41 relative_error: Literal,
42 percentile_col: InputRef,
43}
44
45impl StreamLocalApproxPercentile {
46 pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Result<Self> {
47 let schema = Schema::new(vec![
48 Field::with_name(DataType::Int16, "sign"),
49 Field::with_name(DataType::Int32, "bucket_id"),
50 Field::with_name(DataType::Int32, "count"),
51 ]);
52 let watermark_columns = WatermarkColumns::new();
54 let functional_dependency = FunctionalDependencySet::with_key(3, &[]);
55 let base = PlanBase::new_stream(
56 input.ctx(),
57 schema,
58 input.stream_key().map(|k| k.to_vec()),
59 functional_dependency,
60 input.distribution().clone(),
61 reject_upsert_input!(input),
62 input.emit_on_window_close(),
63 watermark_columns,
64 input.columns_monotonicity().clone(),
65 );
66 Ok(Self {
67 base,
68 input,
69 quantile: approx_percentile_agg_call.direct_args[0].clone(),
70 relative_error: approx_percentile_agg_call.direct_args[1].clone(),
71 percentile_col: approx_percentile_agg_call.inputs[0].clone(),
72 })
73 }
74}
75
76impl Distill for StreamLocalApproxPercentile {
77 fn distill<'a>(&self) -> XmlNode<'a> {
78 let mut out = Vec::with_capacity(5);
79 out.push((
80 "percentile_col",
81 Pretty::display(&InputRefDisplay {
82 input_ref: &self.percentile_col,
83 input_schema: self.input.schema(),
84 }),
85 ));
86 out.push(("quantile", Pretty::debug(&self.quantile)));
87 out.push(("relative_error", Pretty::debug(&self.relative_error)));
88 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
89 out.push(("output_watermarks", ow));
90 }
91 childless_record("StreamLocalApproxPercentile", out)
92 }
93}
94
95impl PlanTreeNodeUnary<Stream> for StreamLocalApproxPercentile {
96 fn input(&self) -> PlanRef {
97 self.input.clone()
98 }
99
100 fn clone_with_input(&self, input: PlanRef) -> Self {
101 Self {
102 base: self.base.clone(),
103 input,
104 quantile: self.quantile.clone(),
105 relative_error: self.relative_error.clone(),
106 percentile_col: self.percentile_col.clone(),
107 }
108 }
109}
110
111impl_plan_tree_node_for_unary! { Stream, StreamLocalApproxPercentile}
112
113impl StreamNode for StreamLocalApproxPercentile {
114 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
115 let relative_error = self.relative_error.get_data().as_ref().unwrap();
116 let relative_error = relative_error.as_float64().into_inner();
117 let base = (1.0 + relative_error) / (1.0 - relative_error);
118 let percentile_index = self.percentile_col.index() as u32;
119 let body = LocalApproxPercentileNode {
120 base,
121 percentile_index,
122 };
123 PbNodeBody::LocalApproxPercentile(Box::new(body))
124 }
125}
126
127impl ExprRewritable<Stream> for StreamLocalApproxPercentile {
128 fn has_rewritable_expr(&self) -> bool {
129 false
130 }
131
132 fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
133 unimplemented!()
134 }
135}
136
137impl ExprVisitable for StreamLocalApproxPercentile {
138 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
139}