risingwave_frontend/optimizer/plan_node/
logical_kafka_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{max, min};
16use std::ops::Bound;
17use std::ops::Bound::{Excluded, Included, Unbounded};
18use std::rc::Rc;
19
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, Schema};
22use risingwave_common::types::DataType;
23
24use super::generic::GenericPlanRef;
25use super::utils::{Distill, childless_record};
26use super::{
27    ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase, PlanRef,
28    PredicatePushdown, ToBatch, ToStream, generic,
29};
30use crate::catalog::source_catalog::SourceCatalog;
31use crate::error::Result;
32use crate::expr::{Expr, ExprImpl, ExprType};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::column_names_pretty;
35use crate::optimizer::plan_node::{
36    BatchKafkaScan, ColumnPruningContext, LogicalSource, PredicatePushdownContext,
37    RewriteStreamContext, ToStreamContext,
38};
39use crate::utils::{ColIndexMapping, Condition};
40
41/// `LogicalKafkaScan` is only used by batch queries. At the beginning of the batch query optimization, `LogicalSource` with a kafka property would be converted into a `LogicalKafkaScan`.
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct LogicalKafkaScan {
44    pub base: PlanBase<Logical>,
45    pub core: generic::Source,
46
47    /// Kafka timestamp range.
48    kafka_timestamp_range: (Bound<i64>, Bound<i64>),
49}
50
51impl LogicalKafkaScan {
52    pub fn create(logical_source: &LogicalSource) -> PlanRef {
53        assert!(logical_source.core.is_kafka_connector());
54
55        let core = logical_source.core.clone();
56        let base = PlanBase::new_logical_with_core(&core);
57        let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded);
58
59        let kafka_scan = LogicalKafkaScan {
60            base,
61            core,
62            kafka_timestamp_range,
63        };
64
65        if let Some(exprs) = &logical_source.output_exprs {
66            LogicalProject::create(kafka_scan.into(), exprs.to_vec())
67        } else {
68            kafka_scan.into()
69        }
70    }
71
72    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
73        self.core.catalog.clone()
74    }
75
76    fn clone_with_kafka_timestamp_range(&self, range: (Bound<i64>, Bound<i64>)) -> Self {
77        Self {
78            base: self.base.clone(),
79            core: self.core.clone(),
80            kafka_timestamp_range: range,
81        }
82    }
83}
84
85impl_plan_tree_node_for_leaf! {LogicalKafkaScan}
86impl Distill for LogicalKafkaScan {
87    fn distill<'a>(&self) -> XmlNode<'a> {
88        let fields = if let Some(catalog) = self.source_catalog() {
89            let src = Pretty::from(catalog.name.clone());
90            let time = Pretty::debug(&self.kafka_timestamp_range);
91            vec![
92                ("source", src),
93                ("columns", column_names_pretty(self.schema())),
94                ("time_range", time),
95            ]
96        } else {
97            vec![]
98        };
99        childless_record("LogicalKafkaScan", fields)
100    }
101}
102
103impl ColPrunable for LogicalKafkaScan {
104    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
105        let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
106        LogicalProject::with_mapping(self.clone().into(), mapping).into()
107    }
108}
109
110impl ExprRewritable for LogicalKafkaScan {}
111
112impl ExprVisitable for LogicalKafkaScan {}
113
114/// A util function to extract kafka offset timestamp range.
115///
116/// Currently we only support limiting kafka offset timestamp range using literals, e.g. we only
117/// support expressions like `_rw_kafka_timestamp <= '2022-10-11 1:00:00+00:00'`.
118///
119/// # Parameters
120///
121/// * `expr`: Expression to be consumed.
122/// * `range`: Original timestamp range, if `expr` can be recognized, we will update `range`.
123/// * `schema`: Input schema.
124///
125/// # Return Value
126///
127/// If `expr` can be recognized and consumed by this function, then we return `None`.
128/// Otherwise `expr` is returned.
129fn expr_to_kafka_timestamp_range(
130    expr: ExprImpl,
131    range: &mut (Bound<i64>, Bound<i64>),
132    schema: &Schema,
133) -> Option<ExprImpl> {
134    let merge_upper_bound = |first, second| -> Bound<i64> {
135        match (first, second) {
136            (first, Unbounded) => first,
137            (Unbounded, second) => second,
138            (Included(f1), Included(f2)) => Included(min(f1, f2)),
139            (Included(f1), Excluded(f2)) => {
140                if f1 < f2 {
141                    Included(f1)
142                } else {
143                    Excluded(f2)
144                }
145            }
146            (Excluded(f1), Included(f2)) => {
147                if f2 < f1 {
148                    Included(f2)
149                } else {
150                    Excluded(f1)
151                }
152            }
153            (Excluded(f1), Excluded(f2)) => Excluded(min(f1, f2)),
154        }
155    };
156
157    let merge_lower_bound = |first, second| -> Bound<i64> {
158        match (first, second) {
159            (first, Unbounded) => first,
160            (Unbounded, second) => second,
161            (Included(f1), Included(f2)) => Included(max(f1, f2)),
162            (Included(f1), Excluded(f2)) => {
163                if f1 > f2 {
164                    Included(f1)
165                } else {
166                    Excluded(f2)
167                }
168            }
169            (Excluded(f1), Included(f2)) => {
170                if f2 > f1 {
171                    Included(f2)
172                } else {
173                    Excluded(f1)
174                }
175            }
176            (Excluded(f1), Excluded(f2)) => Excluded(max(f1, f2)),
177        }
178    };
179
180    let extract_timestampz_literal = |expr: &ExprImpl| -> Result<Option<(i64, bool)>> {
181        match expr {
182            ExprImpl::FunctionCall(function_call) if function_call.inputs().len() == 2 => {
183                match (&function_call.inputs()[0], &function_call.inputs()[1]) {
184                    (ExprImpl::InputRef(input_ref), literal)
185                        if let Some(datum) = literal.try_fold_const().transpose()?
186                            && schema.fields[input_ref.index].name
187                                == KAFKA_TIMESTAMP_COLUMN_NAME
188                            && literal.return_type() == DataType::Timestamptz =>
189                    {
190                        Ok(Some((
191                            datum.unwrap().into_timestamptz().timestamp_millis(),
192                            false,
193                        )))
194                    }
195                    (literal, ExprImpl::InputRef(input_ref))
196                        if let Some(datum) = literal.try_fold_const().transpose()?
197                            && schema.fields[input_ref.index].name
198                                == KAFKA_TIMESTAMP_COLUMN_NAME
199                            && literal.return_type() == DataType::Timestamptz =>
200                    {
201                        Ok(Some((
202                            datum.unwrap().into_timestamptz().timestamp_millis(),
203                            true,
204                        )))
205                    }
206                    _ => Ok(None),
207                }
208            }
209            _ => Ok(None),
210        }
211    };
212
213    match &expr {
214        ExprImpl::FunctionCall(function_call) => {
215            if let Ok(Some((timestampz_literal, reverse))) = extract_timestampz_literal(&expr) {
216                match function_call.func_type() {
217                    ExprType::GreaterThan => {
218                        if reverse {
219                            range.1 = merge_upper_bound(range.1, Excluded(timestampz_literal));
220                        } else {
221                            range.0 = merge_lower_bound(range.0, Excluded(timestampz_literal));
222                        }
223
224                        None
225                    }
226                    ExprType::GreaterThanOrEqual => {
227                        if reverse {
228                            range.1 = merge_upper_bound(range.1, Included(timestampz_literal));
229                        } else {
230                            range.0 = merge_lower_bound(range.0, Included(timestampz_literal));
231                        }
232                        None
233                    }
234                    ExprType::Equal => {
235                        range.0 = merge_lower_bound(range.0, Included(timestampz_literal));
236                        range.1 = merge_upper_bound(range.1, Included(timestampz_literal));
237                        None
238                    }
239                    ExprType::LessThan => {
240                        if reverse {
241                            range.0 = merge_lower_bound(range.0, Excluded(timestampz_literal));
242                        } else {
243                            range.1 = merge_upper_bound(range.1, Excluded(timestampz_literal));
244                        }
245                        None
246                    }
247                    ExprType::LessThanOrEqual => {
248                        if reverse {
249                            range.0 = merge_lower_bound(range.0, Included(timestampz_literal));
250                        } else {
251                            range.1 = merge_upper_bound(range.1, Included(timestampz_literal));
252                        }
253                        None
254                    }
255                    _ => Some(expr),
256                }
257            } else {
258                Some(expr)
259            }
260        }
261        _ => Some(expr),
262    }
263}
264
265impl PredicatePushdown for LogicalKafkaScan {
266    fn predicate_pushdown(
267        &self,
268        predicate: Condition,
269        _ctx: &mut PredicatePushdownContext,
270    ) -> PlanRef {
271        let mut range = self.kafka_timestamp_range;
272
273        let mut new_conjunctions = Vec::with_capacity(predicate.conjunctions.len());
274        for expr in predicate.conjunctions {
275            if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, self.base.schema()) {
276                // Not recognized, so push back
277                new_conjunctions.push(e);
278            }
279        }
280
281        let new_source = self.clone_with_kafka_timestamp_range(range).into();
282
283        if new_conjunctions.is_empty() {
284            new_source
285        } else {
286            LogicalFilter::create(
287                new_source,
288                Condition {
289                    conjunctions: new_conjunctions,
290                },
291            )
292        }
293    }
294}
295
296impl ToBatch for LogicalKafkaScan {
297    fn to_batch(&self) -> Result<PlanRef> {
298        let plan: PlanRef =
299            BatchKafkaScan::new(self.core.clone(), self.kafka_timestamp_range).into();
300        Ok(plan)
301    }
302}
303
304impl ToStream for LogicalKafkaScan {
305    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
306        unreachable!()
307    }
308
309    fn logical_rewrite_for_stream(
310        &self,
311        _ctx: &mut RewriteStreamContext,
312    ) -> Result<(PlanRef, ColIndexMapping)> {
313        unreachable!()
314    }
315}