risingwave_frontend/optimizer/plan_node/
batch_get_channel_delta_stats.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::Schema;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{
22 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeLeaf, ToBatchPb,
23 ToDistributedBatch,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::property::{Distribution, Order};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchGetChannelDeltaStats {
33 pub base: PlanBase<Batch>,
34 pub at_time: Option<u64>,
35 pub time_offset: Option<u64>,
36}
37
38impl PlanTreeNodeLeaf for BatchGetChannelDeltaStats {}
39impl_plan_tree_node_for_leaf! { Batch, BatchGetChannelDeltaStats }
40
41impl BatchGetChannelDeltaStats {
42 pub fn new(
43 ctx: crate::OptimizerContextRef,
44 schema: Schema,
45 at_time: Option<u64>,
46 time_offset: Option<u64>,
47 ) -> Self {
48 Self::with_dist(ctx, schema, at_time, time_offset, Distribution::Single)
49 }
50
51 pub fn with_dist(
52 ctx: crate::OptimizerContextRef,
53 schema: Schema,
54 at_time: Option<u64>,
55 time_offset: Option<u64>,
56 dist: Distribution,
57 ) -> Self {
58 let base = PlanBase::new_batch(ctx, schema, dist, Order::any());
59 Self {
60 base,
61 at_time,
62 time_offset,
63 }
64 }
65
66 pub fn at_time(&self) -> Option<u64> {
68 self.at_time
69 }
70
71 pub fn time_offset(&self) -> Option<u64> {
73 self.time_offset
74 }
75}
76
77impl Distill for BatchGetChannelDeltaStats {
78 fn distill<'a>(&self) -> XmlNode<'a> {
79 let fields = vec![
80 ("at_time", Pretty::debug(&self.at_time)),
81 ("time_offset", Pretty::debug(&self.time_offset)),
82 ];
83 childless_record("BatchGetChannelDeltaStats", fields)
84 }
85}
86
87impl ToDistributedBatch for BatchGetChannelDeltaStats {
88 fn to_distributed(&self) -> Result<PlanRef> {
89 Ok(Self::with_dist(
90 self.base.ctx().clone(),
91 self.base.schema().clone(),
92 self.at_time,
93 self.time_offset,
94 Distribution::Single,
95 )
96 .into())
97 }
98}
99
100impl ToBatchPb for BatchGetChannelDeltaStats {
101 fn to_batch_prost_body(&self) -> NodeBody {
102 use risingwave_pb::batch_plan::GetChannelDeltaStatsNode;
103
104 NodeBody::GetChannelDeltaStats(GetChannelDeltaStatsNode {
105 at_time: self.at_time,
106 time_offset: self.time_offset,
107 })
108 }
109}
110
111impl ToLocalBatch for BatchGetChannelDeltaStats {
112 fn to_local(&self) -> Result<PlanRef> {
113 Ok(Self::with_dist(
114 self.base.ctx().clone(),
115 self.base.schema().clone(),
116 self.at_time,
117 self.time_offset,
118 Distribution::Single,
119 )
120 .into())
121 }
122}
123
124impl ExprRewritable<Batch> for BatchGetChannelDeltaStats {
125 fn has_rewritable_expr(&self) -> bool {
126 false
127 }
128
129 fn rewrite_exprs(&self, _r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
130 self.clone().into()
131 }
132}
133
134impl crate::optimizer::plan_node::expr_visitable::ExprVisitable for BatchGetChannelDeltaStats {
135 fn visit_exprs(&self, _v: &mut dyn crate::expr::ExprVisitor) {
136 }
138}