risingwave_frontend/optimizer/plan_node/
batch_kafka_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16use std::ops::Bound::{Excluded, Included, Unbounded};
17use std::rc::Rc;
18
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_pb::batch_plan::SourceNode;
21use risingwave_pb::batch_plan::plan_node::NodeBody;
22
23use super::batch::prelude::*;
24use super::utils::{Distill, childless_record, column_names_pretty};
25use super::{
26    ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, generic,
27};
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::error::Result;
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::property::{Distribution, Order};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchKafkaScan {
35    pub base: PlanBase<Batch>,
36    pub core: generic::Source,
37
38    /// Kafka timestamp range.
39    kafka_timestamp_range: (Bound<i64>, Bound<i64>),
40}
41
42impl BatchKafkaScan {
43    pub fn new(core: generic::Source, kafka_timestamp_range: (Bound<i64>, Bound<i64>)) -> Self {
44        let base = PlanBase::new_batch_with_core(
45            &core,
46            // Use `Single` by default, will be updated later with `clone_with_dist`.
47            Distribution::Single,
48            Order::any(),
49        );
50
51        Self {
52            base,
53            core,
54            kafka_timestamp_range,
55        }
56    }
57
58    pub fn column_names(&self) -> Vec<&str> {
59        self.schema().names_str()
60    }
61
62    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
63        self.core.catalog.clone()
64    }
65
66    pub fn kafka_timestamp_range_value(&self) -> (Option<i64>, Option<i64>) {
67        let (lower_bound, upper_bound) = &self.kafka_timestamp_range;
68        let lower_bound = match lower_bound {
69            Included(t) => Some(*t),
70            Excluded(t) => Some(*t - 1),
71            Unbounded => None,
72        };
73
74        let upper_bound = match upper_bound {
75            Included(t) => Some(*t),
76            Excluded(t) => Some(*t + 1),
77            Unbounded => None,
78        };
79        (lower_bound, upper_bound)
80    }
81
82    pub fn clone_with_dist(&self) -> Self {
83        let base = self
84            .base
85            .clone_with_new_distribution(Distribution::SomeShard);
86        Self {
87            base,
88            core: self.core.clone(),
89            kafka_timestamp_range: self.kafka_timestamp_range,
90        }
91    }
92}
93
94impl_plan_tree_node_for_leaf! { BatchKafkaScan }
95
96impl Distill for BatchKafkaScan {
97    fn distill<'a>(&self) -> XmlNode<'a> {
98        let src = Pretty::from(self.source_catalog().unwrap().name.clone());
99        let fields = vec![
100            ("source", src),
101            ("columns", column_names_pretty(self.schema())),
102            ("filter", Pretty::debug(&self.kafka_timestamp_range_value())),
103        ];
104        childless_record("BatchKafkaScan", fields)
105    }
106}
107
108impl ToLocalBatch for BatchKafkaScan {
109    fn to_local(&self) -> Result<PlanRef> {
110        Ok(self.clone_with_dist().into())
111    }
112}
113
114impl ToDistributedBatch for BatchKafkaScan {
115    fn to_distributed(&self) -> Result<PlanRef> {
116        Ok(self.clone_with_dist().into())
117    }
118}
119
120impl ToBatchPb for BatchKafkaScan {
121    fn to_batch_prost_body(&self) -> NodeBody {
122        let source_catalog = self.source_catalog().unwrap();
123        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
124        NodeBody::Source(SourceNode {
125            source_id: source_catalog.id,
126            info: Some(source_catalog.info.clone()),
127            columns: self
128                .core
129                .column_catalog
130                .iter()
131                .map(|c| c.to_protobuf())
132                .collect(),
133            with_properties,
134            split: vec![],
135            secret_refs,
136        })
137    }
138}
139
140impl ExprRewritable for BatchKafkaScan {}
141
142impl ExprVisitable for BatchKafkaScan {}