risingwave_frontend/optimizer/plan_node/
logical_kafka_scan.rs1use std::cmp::{max, min};
16use std::ops::Bound;
17use std::ops::Bound::{Excluded, Included, Unbounded};
18use std::rc::Rc;
19
20use pretty_xmlish::{Pretty, XmlNode};
21use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, Schema};
22use risingwave_common::types::DataType;
23
24use super::generic::GenericPlanRef;
25use super::utils::{Distill, childless_record};
26use super::{
27 ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase, PlanRef,
28 PredicatePushdown, ToBatch, ToStream, generic,
29};
30use crate::catalog::source_catalog::SourceCatalog;
31use crate::error::Result;
32use crate::expr::{Expr, ExprImpl, ExprType};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::utils::column_names_pretty;
35use crate::optimizer::plan_node::{
36 BatchKafkaScan, ColumnPruningContext, LogicalSource, PredicatePushdownContext,
37 RewriteStreamContext, ToStreamContext,
38};
39use crate::utils::{ColIndexMapping, Condition};
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct LogicalKafkaScan {
44 pub base: PlanBase<Logical>,
45 pub core: generic::Source,
46
47 kafka_timestamp_range: (Bound<i64>, Bound<i64>),
49}
50
51impl LogicalKafkaScan {
52 pub fn create(logical_source: &LogicalSource) -> PlanRef {
53 assert!(logical_source.core.is_kafka_connector());
54
55 let core = logical_source.core.clone();
56 let base = PlanBase::new_logical_with_core(&core);
57 let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded);
58
59 let kafka_scan = LogicalKafkaScan {
60 base,
61 core,
62 kafka_timestamp_range,
63 };
64
65 if let Some(exprs) = &logical_source.output_exprs {
66 LogicalProject::create(kafka_scan.into(), exprs.to_vec())
67 } else {
68 kafka_scan.into()
69 }
70 }
71
72 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
73 self.core.catalog.clone()
74 }
75
76 fn clone_with_kafka_timestamp_range(&self, range: (Bound<i64>, Bound<i64>)) -> Self {
77 Self {
78 base: self.base.clone(),
79 core: self.core.clone(),
80 kafka_timestamp_range: range,
81 }
82 }
83}
84
85impl_plan_tree_node_for_leaf! {LogicalKafkaScan}
86impl Distill for LogicalKafkaScan {
87 fn distill<'a>(&self) -> XmlNode<'a> {
88 let fields = if let Some(catalog) = self.source_catalog() {
89 let src = Pretty::from(catalog.name.clone());
90 let time = Pretty::debug(&self.kafka_timestamp_range);
91 vec![
92 ("source", src),
93 ("columns", column_names_pretty(self.schema())),
94 ("time_range", time),
95 ]
96 } else {
97 vec![]
98 };
99 childless_record("LogicalKafkaScan", fields)
100 }
101}
102
103impl ColPrunable for LogicalKafkaScan {
104 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
105 let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
106 LogicalProject::with_mapping(self.clone().into(), mapping).into()
107 }
108}
109
110impl ExprRewritable for LogicalKafkaScan {}
111
112impl ExprVisitable for LogicalKafkaScan {}
113
114fn expr_to_kafka_timestamp_range(
130 expr: ExprImpl,
131 range: &mut (Bound<i64>, Bound<i64>),
132 schema: &Schema,
133) -> Option<ExprImpl> {
134 let merge_upper_bound = |first, second| -> Bound<i64> {
135 match (first, second) {
136 (first, Unbounded) => first,
137 (Unbounded, second) => second,
138 (Included(f1), Included(f2)) => Included(min(f1, f2)),
139 (Included(f1), Excluded(f2)) => {
140 if f1 < f2 {
141 Included(f1)
142 } else {
143 Excluded(f2)
144 }
145 }
146 (Excluded(f1), Included(f2)) => {
147 if f2 < f1 {
148 Included(f2)
149 } else {
150 Excluded(f1)
151 }
152 }
153 (Excluded(f1), Excluded(f2)) => Excluded(min(f1, f2)),
154 }
155 };
156
157 let merge_lower_bound = |first, second| -> Bound<i64> {
158 match (first, second) {
159 (first, Unbounded) => first,
160 (Unbounded, second) => second,
161 (Included(f1), Included(f2)) => Included(max(f1, f2)),
162 (Included(f1), Excluded(f2)) => {
163 if f1 > f2 {
164 Included(f1)
165 } else {
166 Excluded(f2)
167 }
168 }
169 (Excluded(f1), Included(f2)) => {
170 if f2 > f1 {
171 Included(f2)
172 } else {
173 Excluded(f1)
174 }
175 }
176 (Excluded(f1), Excluded(f2)) => Excluded(max(f1, f2)),
177 }
178 };
179
180 let extract_timestampz_literal = |expr: &ExprImpl| -> Result<Option<(i64, bool)>> {
181 match expr {
182 ExprImpl::FunctionCall(function_call) if function_call.inputs().len() == 2 => {
183 match (&function_call.inputs()[0], &function_call.inputs()[1]) {
184 (ExprImpl::InputRef(input_ref), literal)
185 if let Some(datum) = literal.try_fold_const().transpose()?
186 && schema.fields[input_ref.index].name
187 == KAFKA_TIMESTAMP_COLUMN_NAME
188 && literal.return_type() == DataType::Timestamptz =>
189 {
190 Ok(Some((
191 datum.unwrap().into_timestamptz().timestamp_millis(),
192 false,
193 )))
194 }
195 (literal, ExprImpl::InputRef(input_ref))
196 if let Some(datum) = literal.try_fold_const().transpose()?
197 && schema.fields[input_ref.index].name
198 == KAFKA_TIMESTAMP_COLUMN_NAME
199 && literal.return_type() == DataType::Timestamptz =>
200 {
201 Ok(Some((
202 datum.unwrap().into_timestamptz().timestamp_millis(),
203 true,
204 )))
205 }
206 _ => Ok(None),
207 }
208 }
209 _ => Ok(None),
210 }
211 };
212
213 match &expr {
214 ExprImpl::FunctionCall(function_call) => {
215 if let Ok(Some((timestampz_literal, reverse))) = extract_timestampz_literal(&expr) {
216 match function_call.func_type() {
217 ExprType::GreaterThan => {
218 if reverse {
219 range.1 = merge_upper_bound(range.1, Excluded(timestampz_literal));
220 } else {
221 range.0 = merge_lower_bound(range.0, Excluded(timestampz_literal));
222 }
223
224 None
225 }
226 ExprType::GreaterThanOrEqual => {
227 if reverse {
228 range.1 = merge_upper_bound(range.1, Included(timestampz_literal));
229 } else {
230 range.0 = merge_lower_bound(range.0, Included(timestampz_literal));
231 }
232 None
233 }
234 ExprType::Equal => {
235 range.0 = merge_lower_bound(range.0, Included(timestampz_literal));
236 range.1 = merge_upper_bound(range.1, Included(timestampz_literal));
237 None
238 }
239 ExprType::LessThan => {
240 if reverse {
241 range.0 = merge_lower_bound(range.0, Excluded(timestampz_literal));
242 } else {
243 range.1 = merge_upper_bound(range.1, Excluded(timestampz_literal));
244 }
245 None
246 }
247 ExprType::LessThanOrEqual => {
248 if reverse {
249 range.0 = merge_lower_bound(range.0, Included(timestampz_literal));
250 } else {
251 range.1 = merge_upper_bound(range.1, Included(timestampz_literal));
252 }
253 None
254 }
255 _ => Some(expr),
256 }
257 } else {
258 Some(expr)
259 }
260 }
261 _ => Some(expr),
262 }
263}
264
265impl PredicatePushdown for LogicalKafkaScan {
266 fn predicate_pushdown(
267 &self,
268 predicate: Condition,
269 _ctx: &mut PredicatePushdownContext,
270 ) -> PlanRef {
271 let mut range = self.kafka_timestamp_range;
272
273 let mut new_conjunctions = Vec::with_capacity(predicate.conjunctions.len());
274 for expr in predicate.conjunctions {
275 if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, self.base.schema()) {
276 new_conjunctions.push(e);
278 }
279 }
280
281 let new_source = self.clone_with_kafka_timestamp_range(range).into();
282
283 if new_conjunctions.is_empty() {
284 new_source
285 } else {
286 LogicalFilter::create(
287 new_source,
288 Condition {
289 conjunctions: new_conjunctions,
290 },
291 )
292 }
293 }
294}
295
296impl ToBatch for LogicalKafkaScan {
297 fn to_batch(&self) -> Result<PlanRef> {
298 let plan: PlanRef =
299 BatchKafkaScan::new(self.core.clone(), self.kafka_timestamp_range).into();
300 Ok(plan)
301 }
302}
303
304impl ToStream for LogicalKafkaScan {
305 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
306 unreachable!()
307 }
308
309 fn logical_rewrite_for_stream(
310 &self,
311 _ctx: &mut RewriteStreamContext,
312 ) -> Result<(PlanRef, ColIndexMapping)> {
313 unreachable!()
314 }
315}