1use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22 ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::types::{DataType, Interval, ScalarImpl};
25use risingwave_common::{bail, bail_not_implemented};
26use risingwave_sqlparser::ast::AsOf;
27
28use crate::binder::{
29 BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource,
30 BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
31};
32use crate::error::{ErrorCode, Result};
33use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
34use crate::optimizer::plan_node::generic::SourceNodeKind;
35use crate::optimizer::plan_node::{
36 LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan,
37 LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef,
38};
39use crate::optimizer::property::Cardinality;
40use crate::planner::{PlanFor, Planner};
41use crate::utils::Condition;
42
43const ERROR_WINDOW_SIZE_ARG: &str =
44 "The size arg of window table function should be an interval literal.";
45
46impl Planner {
47 pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
48 match relation {
49 Relation::BaseTable(t) => self.plan_base_table(&t),
50 Relation::SystemTable(st) => self.plan_sys_table(*st),
51 Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
53 Relation::Join(join) => self.plan_join(*join),
54 Relation::Apply(join) => self.plan_apply(*join),
55 Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
56 Relation::Source(s) => self.plan_source(*s),
57 Relation::TableFunction {
58 expr: tf,
59 with_ordinality,
60 } => self.plan_table_function(tf, with_ordinality),
61 Relation::Watermark(tf) => self.plan_watermark(*tf),
62 Relation::Share(share) => self.plan_share(*share),
64 Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref),
65 }
66 }
67
68 pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
69 Ok(LogicalSysScan::create(
70 sys_table.sys_table_catalog.name().to_owned(),
71 Rc::new(sys_table.sys_table_catalog.table_desc()),
72 self.ctx(),
73 Cardinality::unknown(), )
75 .into())
76 }
77
78 pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
79 let as_of = base_table.as_of.clone();
80 let table_cardinality = base_table.table_catalog.cardinality;
81 let scan = LogicalScan::create(
82 base_table.table_catalog.name().to_owned(),
83 base_table.table_catalog.clone(),
84 base_table
85 .table_indexes
86 .iter()
87 .map(|x| x.as_ref().clone().into())
88 .collect(),
89 self.ctx(),
90 as_of.clone(),
91 table_cardinality,
92 );
93
94 match base_table.table_catalog.engine {
95 Engine::Hummock => {
96 match as_of {
97 None
98 | Some(AsOf::ProcessTime)
99 | Some(AsOf::TimestampNum(_))
100 | Some(AsOf::TimestampString(_))
101 | Some(AsOf::ProcessTimeWithInterval(_)) => {}
102 Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
103 bail_not_implemented!("As Of Version is not supported yet.")
104 }
105 };
106 Ok(scan.into())
107 }
108 Engine::Iceberg => {
109 let is_append_only = base_table.table_catalog.append_only;
110 let use_iceberg_source = match (self.plan_for(), is_append_only) {
111 (PlanFor::StreamIcebergEngineInternal, _) => false,
112 (PlanFor::BatchDql, _) => true,
113 (PlanFor::Stream | PlanFor::Batch, is_append_only) => is_append_only,
114 };
115
116 if !use_iceberg_source {
117 match as_of {
118 None
119 | Some(AsOf::VersionNum(_))
120 | Some(AsOf::TimestampString(_))
121 | Some(AsOf::TimestampNum(_)) => {}
122 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
123 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
124 }
125 Some(AsOf::VersionString(_)) => {
126 bail_not_implemented!("As Of Version is not supported yet.")
127 }
128 }
129 Ok(scan.into())
130 } else {
131 match as_of {
132 None
133 | Some(AsOf::VersionNum(_))
134 | Some(AsOf::TimestampString(_))
135 | Some(AsOf::TimestampNum(_)) => {}
136 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
137 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
138 }
139 Some(AsOf::VersionString(_)) => {
140 bail_not_implemented!("As Of Version is not supported yet.")
141 }
142 }
143 let opt_ctx = self.ctx();
144 let session = opt_ctx.session_ctx();
145 let db_name = &session.database();
146 let catalog_reader = session.env().catalog_reader().read_guard();
147 let mut source_catalog = None;
148 for schema in catalog_reader.iter_schemas(db_name).unwrap() {
149 if schema
150 .get_table_by_id(&base_table.table_catalog.id)
151 .is_some()
152 {
153 source_catalog = schema.get_source_by_name(
154 &base_table.table_catalog.iceberg_source_name().unwrap(),
155 );
156 break;
157 }
158 }
159 if let Some(source_catalog) = source_catalog {
160 let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
161 .columns
162 .clone()
163 .into_iter()
164 .enumerate()
165 .map(|(i, column)| (column.name().to_owned(), (i, column)))
166 .collect();
167 let exprs = scan
168 .table_catalog()
169 .column_schema()
170 .fields()
171 .iter()
172 .map(|field| {
173 let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
174 RISINGWAVE_ICEBERG_ROW_ID
175 } else {
176 &field.name
177 };
178 if let Some((i, source_column)) = column_map.get(source_filed_name)
179 {
180 if source_column.column_desc.data_type == field.data_type {
181 ExprImpl::InputRef(
182 InputRef::new(*i, field.data_type.clone()).into(),
183 )
184 } else {
185 let mut input_ref = ExprImpl::InputRef(
186 InputRef::new(
187 *i,
188 source_column.column_desc.data_type.clone(),
189 )
190 .into(),
191 );
192 FunctionCall::cast_mut(
193 &mut input_ref,
194 field.data_type().clone(),
195 CastContext::Explicit,
196 )
197 .unwrap();
198 input_ref
199 }
200 } else {
201 ExprImpl::Literal(
203 Literal::new(None, field.data_type.clone()).into(),
204 )
205 }
206 })
207 .collect_vec();
208 let logical_source = LogicalSource::with_catalog(
209 Rc::new(source_catalog.deref().clone()),
210 SourceNodeKind::CreateMViewOrBatch,
211 self.ctx(),
212 as_of,
213 )?;
214 Ok(LogicalProject::new(logical_source.into(), exprs).into())
215 } else {
216 bail!(
217 "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
218 base_table.table_catalog.name()
219 );
220 }
221 }
222 }
223 }
224 }
225
226 pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
227 if source.is_shareable_cdc_connector() {
228 Err(ErrorCode::InternalError(
229 "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
230 )
231 .into())
232 } else {
233 let as_of = source.as_of.clone();
234 match as_of {
235 None
236 | Some(AsOf::VersionNum(_))
237 | Some(AsOf::TimestampString(_))
238 | Some(AsOf::TimestampNum(_)) => {}
239 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
240 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
241 }
242 Some(AsOf::VersionString(_)) => {
243 bail_not_implemented!("As Of Version is not supported yet.")
244 }
245 }
246
247 if matches!(self.plan_for(), PlanFor::Stream) {
250 let has_pk =
251 source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
252 if !has_pk {
253 let is_iceberg = source.catalog.is_iceberg_connector();
255 debug_assert!(is_iceberg);
257 if is_iceberg {
258 return Err(ErrorCode::BindError(format!(
259 "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
260 source.catalog
261 ))
262 .into());
263 } else {
264 return Err(ErrorCode::BindError(format!(
265 "Cannot create a stream job from a source without a primary key.
266This is a bug. We would appreciate a bug report at:
267https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
268
269source: {:?}",
270 source.catalog
271 ))
272 .into());
273 }
274 }
275 }
276 Ok(LogicalSource::with_catalog(
277 Rc::new(source.catalog),
278 SourceNodeKind::CreateMViewOrBatch,
279 self.ctx(),
280 as_of,
281 )?
282 .into())
283 }
284 }
285
286 pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
287 let left = self.plan_relation(join.left)?;
288 let right = self.plan_relation(join.right)?;
289 let join_type = join.join_type;
290 let on_clause = join.cond;
291 if on_clause.has_subquery() {
292 bail_not_implemented!("Subquery in join on condition");
293 } else {
294 Ok(LogicalJoin::create(left, right, join_type, on_clause))
295 }
296 }
297
298 pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
299 let join_type = join.join_type;
300 let on_clause = join.cond;
301 if on_clause.has_subquery() {
302 bail_not_implemented!("Subquery in join on condition");
303 }
304
305 let correlated_id = self.ctx.next_correlated_id();
306 let correlated_indices = join
307 .right
308 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
309 let left = self.plan_relation(join.left)?;
310 let right = self.plan_relation(join.right)?;
311
312 Ok(LogicalApply::create(
313 left,
314 right,
315 join_type,
316 Condition::with_expr(on_clause),
317 correlated_id,
318 correlated_indices,
319 false,
320 ))
321 }
322
323 pub(super) fn plan_window_table_function(
324 &mut self,
325 table_function: BoundWindowTableFunction,
326 ) -> Result<PlanRef> {
327 use WindowTableFunctionKind::*;
328 match table_function.kind {
329 Tumble => self.plan_tumble_window(
330 table_function.input,
331 table_function.time_col,
332 table_function.args,
333 ),
334 Hop => self.plan_hop_window(
335 table_function.input,
336 table_function.time_col,
337 table_function.args,
338 ),
339 }
340 }
341
342 pub(super) fn plan_table_function(
343 &mut self,
344 table_function: ExprImpl,
345 with_ordinality: bool,
346 ) -> Result<PlanRef> {
347 match table_function {
349 ExprImpl::TableFunction(tf) => {
350 Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
351 }
352 expr => {
353 let schema = Schema {
354 fields: vec![Field::unnamed(expr.return_type())],
356 };
357 let expr_return_type = expr.return_type();
358 let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
359 let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
360 let mut exprs = if let DataType::Struct(st) = expr_return_type {
361 st.iter()
362 .enumerate()
363 .map(|(i, (_, ty))| {
364 let idx = ExprImpl::literal_int(i.try_into().unwrap());
365 let args = vec![input_ref.clone(), idx];
366 FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
367 })
368 .collect()
369 } else {
370 vec![input_ref]
371 };
372 if with_ordinality {
373 exprs.push(ExprImpl::literal_bigint(1));
374 }
375 Ok(LogicalProject::create(root, exprs))
376 }
377 }
378 }
379
380 pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
381 match share.input {
382 BoundShareInput::Query(Either::Left(nonrecursive_query)) => {
383 let id = share.share_id;
384 match self.share_cache.get(&id) {
385 None => {
386 let result = self
387 .plan_query(nonrecursive_query)?
388 .into_unordered_subplan();
389 let logical_share = LogicalShare::create(result);
390 self.share_cache.insert(id, logical_share.clone());
391 Ok(logical_share)
392 }
393 Some(result) => Ok(result.clone()),
394 }
395 }
396 BoundShareInput::Query(Either::Right(recursive_union)) => self.plan_recursive_union(
398 *recursive_union.base,
399 *recursive_union.recursive,
400 share.share_id,
401 ),
402 BoundShareInput::ChangeLog(relation) => {
403 let id = share.share_id;
404 let result = self.plan_changelog(relation)?;
405 let logical_share = LogicalShare::create(result);
406 self.share_cache.insert(id, logical_share.clone());
407 Ok(logical_share)
408 }
409 }
410 }
411
412 pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
413 todo!("plan watermark");
414 }
415
416 pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result<PlanRef> {
417 let base = self.plan_set_expr(cte_ref.base, vec![], &[])?;
419 Ok(LogicalCteRef::create(cte_ref.share_id, base))
420 }
421
422 fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
423 let col_data_types = match relation {
424 Relation::Source(s) => s
425 .catalog
426 .columns
427 .iter()
428 .map(|col| col.data_type().clone())
429 .collect(),
430 Relation::BaseTable(t) => t
431 .table_catalog
432 .columns
433 .iter()
434 .map(|col| col.data_type().clone())
435 .collect(),
436 Relation::Subquery(q) => q.query.schema().data_types(),
437 Relation::Share(share) => share
438 .input
439 .fields()?
440 .into_iter()
441 .map(|(_, f)| f.data_type)
442 .collect(),
443 r => {
444 return Err(ErrorCode::BindError(format!(
445 "Invalid input relation to tumble: {r:?}"
446 ))
447 .into());
448 }
449 };
450 Ok(col_data_types)
451 }
452
453 fn plan_tumble_window(
454 &mut self,
455 input: Relation,
456 time_col: InputRef,
457 args: Vec<ExprImpl>,
458 ) -> Result<PlanRef> {
459 let mut args = args.into_iter();
460 let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
461
462 match (args.next(), args.next(), args.next()) {
463 (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
464 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
465 for (idx, col_dt) in col_data_types.iter().enumerate() {
466 exprs.push(InputRef::new(idx, col_dt.clone()).into());
467 }
468 let window_start: ExprImpl = FunctionCall::new(
469 ExprType::TumbleStart,
470 vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
471 )?
472 .into();
473 let window_end =
477 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
478 .into();
479 exprs.push(window_start);
480 exprs.push(window_end);
481 let base = self.plan_relation(input)?;
482 let project = LogicalProject::create(base, exprs);
483 Ok(project)
484 }
485 (
486 Some(window_size @ ExprImpl::Literal(_)),
487 Some(window_offset @ ExprImpl::Literal(_)),
488 None,
489 ) => {
490 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
491 for (idx, col_dt) in col_data_types.iter().enumerate() {
492 exprs.push(InputRef::new(idx, col_dt.clone()).into());
493 }
494 let window_start: ExprImpl = FunctionCall::new(
495 ExprType::TumbleStart,
496 vec![
497 ExprImpl::InputRef(Box::new(time_col)),
498 window_size.clone(),
499 window_offset,
500 ],
501 )?
502 .into();
503 let window_end =
507 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
508 .into();
509 exprs.push(window_start);
510 exprs.push(window_end);
511 let base = self.plan_relation(input)?;
512 let project = LogicalProject::create(base, exprs);
513 Ok(project)
514 }
515 _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
516 }
517 }
518
519 fn plan_hop_window(
520 &mut self,
521 input: Relation,
522 time_col: InputRef,
523 args: Vec<ExprImpl>,
524 ) -> Result<PlanRef> {
525 let input = self.plan_relation(input)?;
526 let mut args = args.into_iter();
527 let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
528 args.next_tuple()
529 else {
530 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
531 };
532
533 let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
534 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
535 };
536 let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
537 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
538 };
539
540 let window_offset = match (args.next(), args.next()) {
541 (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
542 Some(ScalarImpl::Interval(window_offset)) => window_offset,
543 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
544 },
545 (None, None) => Interval::from_month_day_usec(0, 0, 0),
546 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
547 };
548
549 if !window_size.is_positive() || !window_slide.is_positive() {
550 return Err(ErrorCode::BindError(format!(
551 "window_size {} and window_slide {} must be positive",
552 window_size, window_slide
553 ))
554 .into());
555 }
556
557 if window_size.exact_div(&window_slide).is_none() {
558 return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
559 }
560
561 Ok(LogicalHopWindow::create(
562 input,
563 time_col,
564 window_slide,
565 window_size,
566 window_offset,
567 ))
568 }
569}