1use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22 ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::types::{DataType, Interval, ScalarImpl};
25use risingwave_common::{bail, bail_not_implemented};
26use risingwave_sqlparser::ast::AsOf;
27
28use crate::binder::{
29 BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource,
30 BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
31};
32use crate::error::{ErrorCode, Result};
33use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
34use crate::optimizer::plan_node::generic::SourceNodeKind;
35use crate::optimizer::plan_node::{
36 LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan,
37 LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef,
38};
39use crate::optimizer::property::Cardinality;
40use crate::planner::{PlanFor, Planner};
41use crate::utils::Condition;
42
43const ERROR_WINDOW_SIZE_ARG: &str =
44 "The size arg of window table function should be an interval literal.";
45
46impl Planner {
47 pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
48 match relation {
49 Relation::BaseTable(t) => self.plan_base_table(&t),
50 Relation::SystemTable(st) => self.plan_sys_table(*st),
51 Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
53 Relation::Join(join) => self.plan_join(*join),
54 Relation::Apply(join) => self.plan_apply(*join),
55 Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
56 Relation::Source(s) => self.plan_source(*s),
57 Relation::TableFunction {
58 expr: tf,
59 with_ordinality,
60 } => self.plan_table_function(tf, with_ordinality),
61 Relation::Watermark(tf) => self.plan_watermark(*tf),
62 Relation::Share(share) => self.plan_share(*share),
64 Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref),
65 }
66 }
67
68 pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
69 Ok(LogicalSysScan::create(
70 sys_table.sys_table_catalog.name().to_owned(),
71 Rc::new(sys_table.sys_table_catalog.table_desc()),
72 self.ctx(),
73 Cardinality::unknown(), )
75 .into())
76 }
77
78 pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
79 let as_of = base_table.as_of.clone();
80 let table_cardinality = base_table.table_catalog.cardinality;
81 let scan = LogicalScan::create(
82 base_table.table_catalog.name().to_owned(),
83 base_table.table_catalog.clone(),
84 base_table
85 .table_indexes
86 .iter()
87 .map(|x| x.as_ref().clone().into())
88 .collect(),
89 self.ctx(),
90 as_of.clone(),
91 table_cardinality,
92 );
93
94 match (base_table.table_catalog.engine, self.plan_for()) {
95 (Engine::Hummock, PlanFor::Stream)
96 | (Engine::Hummock, PlanFor::Batch)
97 | (Engine::Hummock, PlanFor::BatchDql) => {
98 match as_of {
99 None
100 | Some(AsOf::ProcessTime)
101 | Some(AsOf::TimestampNum(_))
102 | Some(AsOf::TimestampString(_))
103 | Some(AsOf::ProcessTimeWithInterval(_)) => {}
104 Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
105 bail_not_implemented!("As Of Version is not supported yet.")
106 }
107 };
108 Ok(scan.into())
109 }
110 (Engine::Iceberg, PlanFor::Stream) | (Engine::Iceberg, PlanFor::Batch) => {
111 match as_of {
112 None
113 | Some(AsOf::VersionNum(_))
114 | Some(AsOf::TimestampString(_))
115 | Some(AsOf::TimestampNum(_)) => {}
116 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
117 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
118 }
119 Some(AsOf::VersionString(_)) => {
120 bail_not_implemented!("As Of Version is not supported yet.")
121 }
122 }
123 Ok(scan.into())
124 }
125 (Engine::Iceberg, PlanFor::BatchDql) => {
126 match as_of {
127 None
128 | Some(AsOf::VersionNum(_))
129 | Some(AsOf::TimestampString(_))
130 | Some(AsOf::TimestampNum(_)) => {}
131 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
132 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
133 }
134 Some(AsOf::VersionString(_)) => {
135 bail_not_implemented!("As Of Version is not supported yet.")
136 }
137 }
138 let opt_ctx = self.ctx();
139 let session = opt_ctx.session_ctx();
140 let db_name = &session.database();
141 let catalog_reader = session.env().catalog_reader().read_guard();
142 let mut source_catalog = None;
143 for schema in catalog_reader.iter_schemas(db_name).unwrap() {
144 if schema
145 .get_table_by_id(&base_table.table_catalog.id)
146 .is_some()
147 {
148 source_catalog = schema.get_source_by_name(
149 &base_table.table_catalog.iceberg_source_name().unwrap(),
150 );
151 break;
152 }
153 }
154 if let Some(source_catalog) = source_catalog {
155 let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
156 .columns
157 .clone()
158 .into_iter()
159 .enumerate()
160 .map(|(i, column)| (column.name().to_owned(), (i, column)))
161 .collect();
162 let exprs = scan
163 .table_catalog()
164 .column_schema()
165 .fields()
166 .iter()
167 .map(|field| {
168 let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
169 RISINGWAVE_ICEBERG_ROW_ID
170 } else {
171 &field.name
172 };
173 if let Some((i, source_column)) = column_map.get(source_filed_name) {
174 if source_column.column_desc.data_type == field.data_type {
175 ExprImpl::InputRef(
176 InputRef::new(*i, field.data_type.clone()).into(),
177 )
178 } else {
179 let mut input_ref = ExprImpl::InputRef(
180 InputRef::new(
181 *i,
182 source_column.column_desc.data_type.clone(),
183 )
184 .into(),
185 );
186 FunctionCall::cast_mut(
187 &mut input_ref,
188 field.data_type().clone(),
189 CastContext::Explicit,
190 )
191 .unwrap();
192 input_ref
193 }
194 } else {
195 ExprImpl::Literal(
197 Literal::new(None, field.data_type.clone()).into(),
198 )
199 }
200 })
201 .collect_vec();
202 let logical_source = LogicalSource::with_catalog(
203 Rc::new(source_catalog.deref().clone()),
204 SourceNodeKind::CreateMViewOrBatch,
205 self.ctx(),
206 as_of,
207 )?;
208 Ok(LogicalProject::new(logical_source.into(), exprs).into())
209 } else {
210 bail!(
211 "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
212 base_table.table_catalog.name()
213 );
214 }
215 }
216 }
217 }
218
219 pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
220 if source.is_shareable_cdc_connector() {
221 Err(ErrorCode::InternalError(
222 "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
223 )
224 .into())
225 } else {
226 let as_of = source.as_of.clone();
227 match as_of {
228 None
229 | Some(AsOf::VersionNum(_))
230 | Some(AsOf::TimestampString(_))
231 | Some(AsOf::TimestampNum(_)) => {}
232 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
233 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
234 }
235 Some(AsOf::VersionString(_)) => {
236 bail_not_implemented!("As Of Version is not supported yet.")
237 }
238 }
239 Ok(LogicalSource::with_catalog(
240 Rc::new(source.catalog),
241 SourceNodeKind::CreateMViewOrBatch,
242 self.ctx(),
243 as_of,
244 )?
245 .into())
246 }
247 }
248
249 pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
250 let left = self.plan_relation(join.left)?;
251 let right = self.plan_relation(join.right)?;
252 let join_type = join.join_type;
253 let on_clause = join.cond;
254 if on_clause.has_subquery() {
255 bail_not_implemented!("Subquery in join on condition");
256 } else {
257 Ok(LogicalJoin::create(left, right, join_type, on_clause))
258 }
259 }
260
261 pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
262 let join_type = join.join_type;
263 let on_clause = join.cond;
264 if on_clause.has_subquery() {
265 bail_not_implemented!("Subquery in join on condition");
266 }
267
268 let correlated_id = self.ctx.next_correlated_id();
269 let correlated_indices = join
270 .right
271 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
272 let left = self.plan_relation(join.left)?;
273 let right = self.plan_relation(join.right)?;
274
275 Ok(LogicalApply::create(
276 left,
277 right,
278 join_type,
279 Condition::with_expr(on_clause),
280 correlated_id,
281 correlated_indices,
282 false,
283 ))
284 }
285
286 pub(super) fn plan_window_table_function(
287 &mut self,
288 table_function: BoundWindowTableFunction,
289 ) -> Result<PlanRef> {
290 use WindowTableFunctionKind::*;
291 match table_function.kind {
292 Tumble => self.plan_tumble_window(
293 table_function.input,
294 table_function.time_col,
295 table_function.args,
296 ),
297 Hop => self.plan_hop_window(
298 table_function.input,
299 table_function.time_col,
300 table_function.args,
301 ),
302 }
303 }
304
305 pub(super) fn plan_table_function(
306 &mut self,
307 table_function: ExprImpl,
308 with_ordinality: bool,
309 ) -> Result<PlanRef> {
310 match table_function {
312 ExprImpl::TableFunction(tf) => {
313 Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
314 }
315 expr => {
316 let schema = Schema {
317 fields: vec![Field::unnamed(expr.return_type())],
319 };
320 let expr_return_type = expr.return_type();
321 let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
322 let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
323 let mut exprs = if let DataType::Struct(st) = expr_return_type {
324 st.iter()
325 .enumerate()
326 .map(|(i, (_, ty))| {
327 let idx = ExprImpl::literal_int(i.try_into().unwrap());
328 let args = vec![input_ref.clone(), idx];
329 FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
330 })
331 .collect()
332 } else {
333 vec![input_ref]
334 };
335 if with_ordinality {
336 exprs.push(ExprImpl::literal_bigint(1));
337 }
338 Ok(LogicalProject::create(root, exprs))
339 }
340 }
341 }
342
343 pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
344 match share.input {
345 BoundShareInput::Query(Either::Left(nonrecursive_query)) => {
346 let id = share.share_id;
347 match self.share_cache.get(&id) {
348 None => {
349 let result = self
350 .plan_query(nonrecursive_query)?
351 .into_unordered_subplan();
352 let logical_share = LogicalShare::create(result);
353 self.share_cache.insert(id, logical_share.clone());
354 Ok(logical_share)
355 }
356 Some(result) => Ok(result.clone()),
357 }
358 }
359 BoundShareInput::Query(Either::Right(recursive_union)) => self.plan_recursive_union(
361 *recursive_union.base,
362 *recursive_union.recursive,
363 share.share_id,
364 ),
365 BoundShareInput::ChangeLog(relation) => {
366 let id = share.share_id;
367 let result = self.plan_changelog(relation)?;
368 let logical_share = LogicalShare::create(result);
369 self.share_cache.insert(id, logical_share.clone());
370 Ok(logical_share)
371 }
372 }
373 }
374
375 pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
376 todo!("plan watermark");
377 }
378
379 pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result<PlanRef> {
380 let base = self.plan_set_expr(cte_ref.base, vec![], &[])?;
382 Ok(LogicalCteRef::create(cte_ref.share_id, base))
383 }
384
385 fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
386 let col_data_types = match relation {
387 Relation::Source(s) => s
388 .catalog
389 .columns
390 .iter()
391 .map(|col| col.data_type().clone())
392 .collect(),
393 Relation::BaseTable(t) => t
394 .table_catalog
395 .columns
396 .iter()
397 .map(|col| col.data_type().clone())
398 .collect(),
399 Relation::Subquery(q) => q.query.schema().data_types(),
400 Relation::Share(share) => share
401 .input
402 .fields()?
403 .into_iter()
404 .map(|(_, f)| f.data_type)
405 .collect(),
406 r => {
407 return Err(ErrorCode::BindError(format!(
408 "Invalid input relation to tumble: {r:?}"
409 ))
410 .into());
411 }
412 };
413 Ok(col_data_types)
414 }
415
416 fn plan_tumble_window(
417 &mut self,
418 input: Relation,
419 time_col: InputRef,
420 args: Vec<ExprImpl>,
421 ) -> Result<PlanRef> {
422 let mut args = args.into_iter();
423 let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
424
425 match (args.next(), args.next(), args.next()) {
426 (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
427 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
428 for (idx, col_dt) in col_data_types.iter().enumerate() {
429 exprs.push(InputRef::new(idx, col_dt.clone()).into());
430 }
431 let window_start: ExprImpl = FunctionCall::new(
432 ExprType::TumbleStart,
433 vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
434 )?
435 .into();
436 let window_end =
440 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
441 .into();
442 exprs.push(window_start);
443 exprs.push(window_end);
444 let base = self.plan_relation(input)?;
445 let project = LogicalProject::create(base, exprs);
446 Ok(project)
447 }
448 (
449 Some(window_size @ ExprImpl::Literal(_)),
450 Some(window_offset @ ExprImpl::Literal(_)),
451 None,
452 ) => {
453 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
454 for (idx, col_dt) in col_data_types.iter().enumerate() {
455 exprs.push(InputRef::new(idx, col_dt.clone()).into());
456 }
457 let window_start: ExprImpl = FunctionCall::new(
458 ExprType::TumbleStart,
459 vec![
460 ExprImpl::InputRef(Box::new(time_col)),
461 window_size.clone(),
462 window_offset,
463 ],
464 )?
465 .into();
466 let window_end =
470 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
471 .into();
472 exprs.push(window_start);
473 exprs.push(window_end);
474 let base = self.plan_relation(input)?;
475 let project = LogicalProject::create(base, exprs);
476 Ok(project)
477 }
478 _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
479 }
480 }
481
482 fn plan_hop_window(
483 &mut self,
484 input: Relation,
485 time_col: InputRef,
486 args: Vec<ExprImpl>,
487 ) -> Result<PlanRef> {
488 let input = self.plan_relation(input)?;
489 let mut args = args.into_iter();
490 let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
491 args.next_tuple()
492 else {
493 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
494 };
495
496 let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
497 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
498 };
499 let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
500 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
501 };
502
503 let window_offset = match (args.next(), args.next()) {
504 (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
505 Some(ScalarImpl::Interval(window_offset)) => window_offset,
506 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
507 },
508 (None, None) => Interval::from_month_day_usec(0, 0, 0),
509 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
510 };
511
512 if !window_size.is_positive() || !window_slide.is_positive() {
513 return Err(ErrorCode::BindError(format!(
514 "window_size {} and window_slide {} must be positive",
515 window_size, window_slide
516 ))
517 .into());
518 }
519
520 if window_size.exact_div(&window_slide).is_none() {
521 return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
522 }
523
524 Ok(LogicalHopWindow::create(
525 input,
526 time_col,
527 window_slide,
528 window_size,
529 window_offset,
530 ))
531 }
532}