risingwave_frontend/optimizer/plan_node/
batch_kafka_scan.rs1use std::ops::Bound;
16use std::ops::Bound::{Excluded, Included, Unbounded};
17use std::rc::Rc;
18
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_pb::batch_plan::SourceNode;
21use risingwave_pb::batch_plan::plan_node::NodeBody;
22
23use super::batch::prelude::*;
24use super::utils::{Distill, childless_record, column_names_pretty};
25use super::{
26 ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, generic,
27};
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::error::Result;
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::property::{Distribution, Order};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchKafkaScan {
35 pub base: PlanBase<Batch>,
36 pub core: generic::Source,
37
38 kafka_timestamp_range: (Bound<i64>, Bound<i64>),
40}
41
42impl BatchKafkaScan {
43 pub fn new(core: generic::Source, kafka_timestamp_range: (Bound<i64>, Bound<i64>)) -> Self {
44 let base = PlanBase::new_batch_with_core(
45 &core,
46 Distribution::Single,
48 Order::any(),
49 );
50
51 Self {
52 base,
53 core,
54 kafka_timestamp_range,
55 }
56 }
57
58 pub fn column_names(&self) -> Vec<&str> {
59 self.schema().names_str()
60 }
61
62 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
63 self.core.catalog.clone()
64 }
65
66 pub fn kafka_timestamp_range_value(&self) -> (Option<i64>, Option<i64>) {
67 let (lower_bound, upper_bound) = &self.kafka_timestamp_range;
68 let lower_bound = match lower_bound {
69 Included(t) => Some(*t),
70 Excluded(t) => Some(*t - 1),
71 Unbounded => None,
72 };
73
74 let upper_bound = match upper_bound {
75 Included(t) => Some(*t),
76 Excluded(t) => Some(*t + 1),
77 Unbounded => None,
78 };
79 (lower_bound, upper_bound)
80 }
81
82 pub fn clone_with_dist(&self) -> Self {
83 let base = self
84 .base
85 .clone_with_new_distribution(Distribution::SomeShard);
86 Self {
87 base,
88 core: self.core.clone(),
89 kafka_timestamp_range: self.kafka_timestamp_range,
90 }
91 }
92}
93
94impl_plan_tree_node_for_leaf! { BatchKafkaScan }
95
96impl Distill for BatchKafkaScan {
97 fn distill<'a>(&self) -> XmlNode<'a> {
98 let src = Pretty::from(self.source_catalog().unwrap().name.clone());
99 let fields = vec![
100 ("source", src),
101 ("columns", column_names_pretty(self.schema())),
102 ("filter", Pretty::debug(&self.kafka_timestamp_range_value())),
103 ];
104 childless_record("BatchKafkaScan", fields)
105 }
106}
107
108impl ToLocalBatch for BatchKafkaScan {
109 fn to_local(&self) -> Result<PlanRef> {
110 Ok(self.clone_with_dist().into())
111 }
112}
113
114impl ToDistributedBatch for BatchKafkaScan {
115 fn to_distributed(&self) -> Result<PlanRef> {
116 Ok(self.clone_with_dist().into())
117 }
118}
119
120impl ToBatchPb for BatchKafkaScan {
121 fn to_batch_prost_body(&self) -> NodeBody {
122 let source_catalog = self.source_catalog().unwrap();
123 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
124 NodeBody::Source(SourceNode {
125 source_id: source_catalog.id,
126 info: Some(source_catalog.info.clone()),
127 columns: self
128 .core
129 .column_catalog
130 .iter()
131 .map(|c| c.to_protobuf())
132 .collect(),
133 with_properties,
134 split: vec![],
135 secret_refs,
136 })
137 }
138}
139
140impl ExprRewritable for BatchKafkaScan {}
141
142impl ExprVisitable for BatchKafkaScan {}