risingwave_frontend/optimizer/plan_node/
batch_kafka_scan.rs1use std::ops::Bound;
16use std::ops::Bound::{Excluded, Included, Unbounded};
17use std::rc::Rc;
18
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_pb::batch_plan::SourceNode;
21use risingwave_pb::batch_plan::plan_node::NodeBody;
22
23use super::batch::prelude::*;
24use super::utils::{Distill, childless_record, column_names_pretty};
25use super::{
26 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
27 generic,
28};
29use crate::catalog::source_catalog::SourceCatalog;
30use crate::error::Result;
31use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
32use crate::optimizer::property::{Distribution, Order};
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct BatchKafkaScan {
36 pub base: PlanBase<Batch>,
37 pub core: generic::Source,
38
39 kafka_timestamp_range: (Bound<i64>, Bound<i64>),
41}
42
43impl BatchKafkaScan {
44 pub fn new(core: generic::Source, kafka_timestamp_range: (Bound<i64>, Bound<i64>)) -> Self {
45 let base = PlanBase::new_batch_with_core(
46 &core,
47 Distribution::Single,
49 Order::any(),
50 );
51
52 Self {
53 base,
54 core,
55 kafka_timestamp_range,
56 }
57 }
58
59 pub fn column_names(&self) -> Vec<&str> {
60 self.schema().names_str()
61 }
62
63 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
64 self.core.catalog.clone()
65 }
66
67 pub fn kafka_timestamp_range_value(&self) -> (Option<i64>, Option<i64>) {
68 let (lower_bound, upper_bound) = &self.kafka_timestamp_range;
69 let lower_bound = match lower_bound {
70 Included(t) => Some(*t),
71 Excluded(t) => Some(*t - 1),
72 Unbounded => None,
73 };
74
75 let upper_bound = match upper_bound {
76 Included(t) => Some(*t),
77 Excluded(t) => Some(*t + 1),
78 Unbounded => None,
79 };
80 (lower_bound, upper_bound)
81 }
82
83 pub fn clone_with_dist(&self) -> Self {
84 let base = self
85 .base
86 .clone_with_new_distribution(Distribution::SomeShard);
87 Self {
88 base,
89 core: self.core.clone(),
90 kafka_timestamp_range: self.kafka_timestamp_range,
91 }
92 }
93}
94
95impl_plan_tree_node_for_leaf! { Batch, BatchKafkaScan }
96
97impl Distill for BatchKafkaScan {
98 fn distill<'a>(&self) -> XmlNode<'a> {
99 let src = Pretty::from(self.source_catalog().unwrap().name.clone());
100 let fields = vec![
101 ("source", src),
102 ("columns", column_names_pretty(self.schema())),
103 ("filter", Pretty::debug(&self.kafka_timestamp_range_value())),
104 ];
105 childless_record("BatchKafkaScan", fields)
106 }
107}
108
109impl ToLocalBatch for BatchKafkaScan {
110 fn to_local(&self) -> Result<PlanRef> {
111 Ok(self.clone_with_dist().into())
112 }
113}
114
115impl ToDistributedBatch for BatchKafkaScan {
116 fn to_distributed(&self) -> Result<PlanRef> {
117 Ok(self.clone_with_dist().into())
118 }
119}
120
121impl ToBatchPb for BatchKafkaScan {
122 fn to_batch_prost_body(&self) -> NodeBody {
123 let source_catalog = self.source_catalog().unwrap();
124 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
125 NodeBody::Source(SourceNode {
126 source_id: source_catalog.id,
127 info: Some(source_catalog.info.clone()),
128 columns: self
129 .core
130 .column_catalog
131 .iter()
132 .map(|c| c.to_protobuf())
133 .collect(),
134 with_properties,
135 split: vec![],
136 secret_refs,
137 })
138 }
139}
140
141impl ExprRewritable<Batch> for BatchKafkaScan {}
142
143impl ExprVisitable for BatchKafkaScan {}