risingwave_frontend/optimizer/plan_node/
stream_local_approx_percentile.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18use risingwave_pb::stream_plan::LocalApproxPercentileNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use crate::PlanRef;
22use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
25use crate::optimizer::plan_node::stream::StreamPlanRef;
26use crate::optimizer::plan_node::utils::{Distill, childless_record, watermark_pretty};
27use crate::optimizer::plan_node::{
28 ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
29};
30use crate::optimizer::property::{FunctionalDependencySet, WatermarkColumns};
31use crate::stream_fragmenter::BuildFragmentGraphState;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamLocalApproxPercentile {
37 pub base: PlanBase<Stream>,
38 input: PlanRef,
39 quantile: Literal,
40 relative_error: Literal,
41 percentile_col: InputRef,
42}
43
44impl StreamLocalApproxPercentile {
45 pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
46 let schema = Schema::new(vec![
47 Field::with_name(DataType::Int16, "sign"),
48 Field::with_name(DataType::Int32, "bucket_id"),
49 Field::with_name(DataType::Int32, "count"),
50 ]);
51 let watermark_columns = WatermarkColumns::new();
53 let functional_dependency = FunctionalDependencySet::with_key(3, &[]);
54 let base = PlanBase::new_stream(
55 input.ctx(),
56 schema,
57 input.stream_key().map(|k| k.to_vec()),
58 functional_dependency,
59 input.distribution().clone(),
60 input.append_only(),
61 input.emit_on_window_close(),
62 watermark_columns,
63 input.columns_monotonicity().clone(),
64 );
65 Self {
66 base,
67 input,
68 quantile: approx_percentile_agg_call.direct_args[0].clone(),
69 relative_error: approx_percentile_agg_call.direct_args[1].clone(),
70 percentile_col: approx_percentile_agg_call.inputs[0].clone(),
71 }
72 }
73}
74
75impl Distill for StreamLocalApproxPercentile {
76 fn distill<'a>(&self) -> XmlNode<'a> {
77 let mut out = Vec::with_capacity(5);
78 out.push((
79 "percentile_col",
80 Pretty::display(&InputRefDisplay {
81 input_ref: &self.percentile_col,
82 input_schema: self.input.schema(),
83 }),
84 ));
85 out.push(("quantile", Pretty::debug(&self.quantile)));
86 out.push(("relative_error", Pretty::debug(&self.relative_error)));
87 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
88 out.push(("output_watermarks", ow));
89 }
90 childless_record("StreamLocalApproxPercentile", out)
91 }
92}
93
94impl PlanTreeNodeUnary for StreamLocalApproxPercentile {
95 fn input(&self) -> PlanRef {
96 self.input.clone()
97 }
98
99 fn clone_with_input(&self, input: PlanRef) -> Self {
100 Self {
101 base: self.base.clone(),
102 input,
103 quantile: self.quantile.clone(),
104 relative_error: self.relative_error.clone(),
105 percentile_col: self.percentile_col.clone(),
106 }
107 }
108}
109
110impl_plan_tree_node_for_unary! {StreamLocalApproxPercentile}
111
112impl StreamNode for StreamLocalApproxPercentile {
113 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
114 let relative_error = self.relative_error.get_data().as_ref().unwrap();
115 let relative_error = relative_error.as_float64().into_inner();
116 let base = (1.0 + relative_error) / (1.0 - relative_error);
117 let percentile_index = self.percentile_col.index() as u32;
118 let body = LocalApproxPercentileNode {
119 base,
120 percentile_index,
121 };
122 PbNodeBody::LocalApproxPercentile(Box::new(body))
123 }
124}
125
126impl ExprRewritable for StreamLocalApproxPercentile {
127 fn has_rewritable_expr(&self) -> bool {
128 false
129 }
130
131 fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
132 unimplemented!()
133 }
134}
135
136impl ExprVisitable for StreamLocalApproxPercentile {
137 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
138}