risingwave_frontend/optimizer/plan_node/
batch_kafka_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16use std::ops::Bound::{Excluded, Included, Unbounded};
17use std::rc::Rc;
18
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_pb::batch_plan::SourceNode;
21use risingwave_pb::batch_plan::plan_node::NodeBody;
22
23use super::batch::prelude::*;
24use super::utils::{Distill, childless_record, column_names_pretty};
25use super::{
26    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
27    generic,
28};
29use crate::catalog::source_catalog::SourceCatalog;
30use crate::error::Result;
31use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
32use crate::optimizer::property::{Distribution, Order};
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct BatchKafkaScan {
36    pub base: PlanBase<Batch>,
37    pub core: generic::Source,
38
39    /// Kafka timestamp range.
40    kafka_timestamp_range: (Bound<i64>, Bound<i64>),
41}
42
43impl BatchKafkaScan {
44    pub fn new(core: generic::Source, kafka_timestamp_range: (Bound<i64>, Bound<i64>)) -> Self {
45        let base = PlanBase::new_batch_with_core(
46            &core,
47            // Use `Single` by default, will be updated later with `clone_with_dist`.
48            Distribution::Single,
49            Order::any(),
50        );
51
52        Self {
53            base,
54            core,
55            kafka_timestamp_range,
56        }
57    }
58
59    pub fn column_names(&self) -> Vec<&str> {
60        self.schema().names_str()
61    }
62
63    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
64        self.core.catalog.clone()
65    }
66
67    pub fn kafka_timestamp_range_value(&self) -> (Option<i64>, Option<i64>) {
68        let (lower_bound, upper_bound) = &self.kafka_timestamp_range;
69        let lower_bound = match lower_bound {
70            Included(t) => Some(*t),
71            Excluded(t) => Some(*t - 1),
72            Unbounded => None,
73        };
74
75        let upper_bound = match upper_bound {
76            Included(t) => Some(*t),
77            Excluded(t) => Some(*t + 1),
78            Unbounded => None,
79        };
80        (lower_bound, upper_bound)
81    }
82
83    pub fn clone_with_dist(&self) -> Self {
84        let base = self
85            .base
86            .clone_with_new_distribution(Distribution::SomeShard);
87        Self {
88            base,
89            core: self.core.clone(),
90            kafka_timestamp_range: self.kafka_timestamp_range,
91        }
92    }
93}
94
95impl_plan_tree_node_for_leaf! { Batch, BatchKafkaScan }
96
97impl Distill for BatchKafkaScan {
98    fn distill<'a>(&self) -> XmlNode<'a> {
99        let src = Pretty::from(self.source_catalog().unwrap().name.clone());
100        let fields = vec![
101            ("source", src),
102            ("columns", column_names_pretty(self.schema())),
103            ("filter", Pretty::debug(&self.kafka_timestamp_range_value())),
104        ];
105        childless_record("BatchKafkaScan", fields)
106    }
107}
108
109impl ToLocalBatch for BatchKafkaScan {
110    fn to_local(&self) -> Result<PlanRef> {
111        Ok(self.clone_with_dist().into())
112    }
113}
114
115impl ToDistributedBatch for BatchKafkaScan {
116    fn to_distributed(&self) -> Result<PlanRef> {
117        Ok(self.clone_with_dist().into())
118    }
119}
120
121impl ToBatchPb for BatchKafkaScan {
122    fn to_batch_prost_body(&self) -> NodeBody {
123        let source_catalog = self.source_catalog().unwrap();
124        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
125        NodeBody::Source(SourceNode {
126            source_id: source_catalog.id,
127            info: Some(source_catalog.info.clone()),
128            columns: self
129                .core
130                .column_catalog
131                .iter()
132                .map(|c| c.to_protobuf())
133                .collect(),
134            with_properties,
135            split: vec![],
136            secret_refs,
137        })
138    }
139}
140
141impl ExprRewritable<Batch> for BatchKafkaScan {}
142
143impl ExprVisitable for BatchKafkaScan {}