1use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22 ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::types::{DataType, Interval, ScalarImpl};
25use risingwave_common::{bail, bail_not_implemented};
26use risingwave_sqlparser::ast::AsOf;
27
28use crate::binder::{
29 BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource,
30 BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
31};
32use crate::error::{ErrorCode, Result};
33use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
34use crate::optimizer::plan_node::generic::SourceNodeKind;
35use crate::optimizer::plan_node::{
36 LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalPlanRef as PlanRef,
37 LogicalProject, LogicalScan, LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction,
38 LogicalValues,
39};
40use crate::optimizer::property::Cardinality;
41use crate::planner::{PlanFor, Planner};
42use crate::utils::Condition;
43
44const ERROR_WINDOW_SIZE_ARG: &str =
45 "The size arg of window table function should be an interval literal.";
46
47impl Planner {
48 pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
49 match relation {
50 Relation::BaseTable(t) => self.plan_base_table(&t),
51 Relation::SystemTable(st) => self.plan_sys_table(*st),
52 Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
54 Relation::Join(join) => self.plan_join(*join),
55 Relation::Apply(join) => self.plan_apply(*join),
56 Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
57 Relation::Source(s) => self.plan_source(*s),
58 Relation::TableFunction {
59 expr: tf,
60 with_ordinality,
61 } => self.plan_table_function(tf, with_ordinality),
62 Relation::Watermark(tf) => self.plan_watermark(*tf),
63 Relation::Share(share) => self.plan_share(*share),
65 Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref),
66 }
67 }
68
69 pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
70 Ok(LogicalSysScan::create(
71 sys_table.sys_table_catalog,
72 self.ctx(),
73 Cardinality::unknown(), )
75 .into())
76 }
77
78 pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
79 let as_of = base_table.as_of.clone();
80 let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
81
82 match base_table.table_catalog.engine {
83 Engine::Hummock => {
84 match as_of {
85 None
86 | Some(AsOf::ProcessTime)
87 | Some(AsOf::TimestampNum(_))
88 | Some(AsOf::TimestampString(_))
89 | Some(AsOf::ProcessTimeWithInterval(_)) => {}
90 Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
91 bail_not_implemented!("As Of Version is not supported yet.")
92 }
93 };
94 Ok(scan.into())
95 }
96 Engine::Iceberg => {
97 let is_append_only = base_table.table_catalog.append_only;
98 let use_iceberg_source = match (self.plan_for(), is_append_only) {
99 (PlanFor::StreamIcebergEngineInternal, _) => false,
100 (PlanFor::BatchDql, _) => true,
101 (PlanFor::Stream | PlanFor::Batch, is_append_only) => is_append_only,
102 };
103
104 if !use_iceberg_source {
105 match as_of {
106 None
107 | Some(AsOf::VersionNum(_))
108 | Some(AsOf::TimestampString(_))
109 | Some(AsOf::TimestampNum(_)) => {}
110 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
111 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
112 }
113 Some(AsOf::VersionString(_)) => {
114 bail_not_implemented!("As Of Version is not supported yet.")
115 }
116 }
117 Ok(scan.into())
118 } else {
119 match as_of {
120 None
121 | Some(AsOf::VersionNum(_))
122 | Some(AsOf::TimestampString(_))
123 | Some(AsOf::TimestampNum(_)) => {}
124 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
125 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
126 }
127 Some(AsOf::VersionString(_)) => {
128 bail_not_implemented!("As Of Version is not supported yet.")
129 }
130 }
131 let opt_ctx = self.ctx();
132 let session = opt_ctx.session_ctx();
133 let db_name = &session.database();
134 let catalog_reader = session.env().catalog_reader().read_guard();
135 let mut source_catalog = None;
136 for schema in catalog_reader.iter_schemas(db_name).unwrap() {
137 if schema
138 .get_table_by_id(&base_table.table_catalog.id)
139 .is_some()
140 {
141 source_catalog = schema.get_source_by_name(
142 &base_table.table_catalog.iceberg_source_name().unwrap(),
143 );
144 break;
145 }
146 }
147 if let Some(source_catalog) = source_catalog {
148 let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
149 .columns
150 .clone()
151 .into_iter()
152 .enumerate()
153 .map(|(i, column)| (column.name().to_owned(), (i, column)))
154 .collect();
155 let exprs = scan
156 .table()
157 .column_schema()
158 .fields()
159 .iter()
160 .map(|field| {
161 let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
162 RISINGWAVE_ICEBERG_ROW_ID
163 } else {
164 &field.name
165 };
166 if let Some((i, source_column)) = column_map.get(source_filed_name)
167 {
168 if source_column.column_desc.data_type == field.data_type {
169 ExprImpl::InputRef(
170 InputRef::new(*i, field.data_type.clone()).into(),
171 )
172 } else {
173 let mut input_ref = ExprImpl::InputRef(
174 InputRef::new(
175 *i,
176 source_column.column_desc.data_type.clone(),
177 )
178 .into(),
179 );
180 FunctionCall::cast_mut(
181 &mut input_ref,
182 &field.data_type(),
183 CastContext::Explicit,
184 )
185 .unwrap();
186 input_ref
187 }
188 } else {
189 ExprImpl::Literal(
191 Literal::new(None, field.data_type.clone()).into(),
192 )
193 }
194 })
195 .collect_vec();
196 let logical_source = LogicalSource::with_catalog(
197 Rc::new(source_catalog.deref().clone()),
198 SourceNodeKind::CreateMViewOrBatch,
199 self.ctx(),
200 as_of,
201 )?;
202 Ok(LogicalProject::new(logical_source.into(), exprs).into())
203 } else {
204 bail!(
205 "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
206 base_table.table_catalog.name()
207 );
208 }
209 }
210 }
211 }
212 }
213
214 pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
215 if source.is_shareable_cdc_connector() {
216 Err(ErrorCode::InternalError(
217 "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
218 )
219 .into())
220 } else {
221 let as_of = source.as_of.clone();
222 match as_of {
223 None
224 | Some(AsOf::VersionNum(_))
225 | Some(AsOf::TimestampString(_))
226 | Some(AsOf::TimestampNum(_)) => {}
227 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
228 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
229 }
230 Some(AsOf::VersionString(_)) => {
231 bail_not_implemented!("As Of Version is not supported yet.")
232 }
233 }
234
235 if matches!(self.plan_for(), PlanFor::Stream) {
238 let has_pk =
239 source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
240 if !has_pk {
241 let is_iceberg = source.catalog.is_iceberg_connector();
243 debug_assert!(is_iceberg);
245 if is_iceberg {
246 return Err(ErrorCode::BindError(format!(
247 "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
248 source.catalog
249 ))
250 .into());
251 } else {
252 return Err(ErrorCode::BindError(format!(
253 "Cannot create a stream job from a source without a primary key.
254This is a bug. We would appreciate a bug report at:
255https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
256
257source: {:?}",
258 source.catalog
259 ))
260 .into());
261 }
262 }
263 }
264 Ok(LogicalSource::with_catalog(
265 Rc::new(source.catalog),
266 SourceNodeKind::CreateMViewOrBatch,
267 self.ctx(),
268 as_of,
269 )?
270 .into())
271 }
272 }
273
274 pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
275 let left = self.plan_relation(join.left)?;
276 let right = self.plan_relation(join.right)?;
277 let join_type = join.join_type;
278 let on_clause = join.cond;
279 if on_clause.has_subquery() {
280 bail_not_implemented!("Subquery in join on condition");
281 } else {
282 Ok(LogicalJoin::create(left, right, join_type, on_clause))
283 }
284 }
285
286 pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
287 let join_type = join.join_type;
288 let on_clause = join.cond;
289 if on_clause.has_subquery() {
290 bail_not_implemented!("Subquery in join on condition");
291 }
292
293 let correlated_id = self.ctx.next_correlated_id();
294 let correlated_indices = join
295 .right
296 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
297 let left = self.plan_relation(join.left)?;
298 let right = self.plan_relation(join.right)?;
299
300 Ok(LogicalApply::create(
301 left,
302 right,
303 join_type,
304 Condition::with_expr(on_clause),
305 correlated_id,
306 correlated_indices,
307 false,
308 ))
309 }
310
311 pub(super) fn plan_window_table_function(
312 &mut self,
313 table_function: BoundWindowTableFunction,
314 ) -> Result<PlanRef> {
315 use WindowTableFunctionKind::*;
316 match table_function.kind {
317 Tumble => self.plan_tumble_window(
318 table_function.input,
319 table_function.time_col,
320 table_function.args,
321 ),
322 Hop => self.plan_hop_window(
323 table_function.input,
324 table_function.time_col,
325 table_function.args,
326 ),
327 }
328 }
329
330 pub(super) fn plan_table_function(
331 &mut self,
332 table_function: ExprImpl,
333 with_ordinality: bool,
334 ) -> Result<PlanRef> {
335 match table_function {
337 ExprImpl::TableFunction(tf) => {
338 Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
339 }
340 expr => {
341 let schema = Schema {
342 fields: vec![Field::unnamed(expr.return_type())],
344 };
345 let expr_return_type = expr.return_type();
346 let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
347 let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
348 let mut exprs = if let DataType::Struct(st) = expr_return_type {
349 st.iter()
350 .enumerate()
351 .map(|(i, (_, ty))| {
352 let idx = ExprImpl::literal_int(i.try_into().unwrap());
353 let args = vec![input_ref.clone(), idx];
354 FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
355 })
356 .collect()
357 } else {
358 vec![input_ref]
359 };
360 if with_ordinality {
361 exprs.push(ExprImpl::literal_bigint(1));
362 }
363 Ok(LogicalProject::create(root, exprs))
364 }
365 }
366 }
367
368 pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
369 match share.input {
370 BoundShareInput::Query(Either::Left(nonrecursive_query)) => {
371 let id = share.share_id;
372 match self.share_cache.get(&id) {
373 None => {
374 let result = self
375 .plan_query(nonrecursive_query)?
376 .into_unordered_subplan();
377 let logical_share = LogicalShare::create(result);
378 self.share_cache.insert(id, logical_share.clone());
379 Ok(logical_share)
380 }
381 Some(result) => Ok(result.clone()),
382 }
383 }
384 BoundShareInput::Query(Either::Right(recursive_union)) => self.plan_recursive_union(
386 *recursive_union.base,
387 *recursive_union.recursive,
388 share.share_id,
389 ),
390 BoundShareInput::ChangeLog(relation) => {
391 let id = share.share_id;
392 let result = self.plan_changelog(relation)?;
393 let logical_share = LogicalShare::create(result);
394 self.share_cache.insert(id, logical_share.clone());
395 Ok(logical_share)
396 }
397 }
398 }
399
400 pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
401 todo!("plan watermark");
402 }
403
404 pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result<PlanRef> {
405 let base = self.plan_set_expr(cte_ref.base, vec![], &[])?;
407 Ok(LogicalCteRef::create(cte_ref.share_id, base))
408 }
409
410 fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
411 let col_data_types = match relation {
412 Relation::Source(s) => s
413 .catalog
414 .columns
415 .iter()
416 .map(|col| col.data_type().clone())
417 .collect(),
418 Relation::BaseTable(t) => t
419 .table_catalog
420 .columns
421 .iter()
422 .map(|col| col.data_type().clone())
423 .collect(),
424 Relation::Subquery(q) => q.query.schema().data_types(),
425 Relation::Share(share) => share
426 .input
427 .fields()?
428 .into_iter()
429 .map(|(_, f)| f.data_type)
430 .collect(),
431 r => {
432 return Err(ErrorCode::BindError(format!(
433 "Invalid input relation to tumble: {r:?}"
434 ))
435 .into());
436 }
437 };
438 Ok(col_data_types)
439 }
440
441 fn plan_tumble_window(
442 &mut self,
443 input: Relation,
444 time_col: InputRef,
445 args: Vec<ExprImpl>,
446 ) -> Result<PlanRef> {
447 let mut args = args.into_iter();
448 let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
449
450 match (args.next(), args.next(), args.next()) {
451 (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
452 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
453 for (idx, col_dt) in col_data_types.iter().enumerate() {
454 exprs.push(InputRef::new(idx, col_dt.clone()).into());
455 }
456 let window_start: ExprImpl = FunctionCall::new(
457 ExprType::TumbleStart,
458 vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
459 )?
460 .into();
461 let window_end =
465 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
466 .into();
467 exprs.push(window_start);
468 exprs.push(window_end);
469 let base = self.plan_relation(input)?;
470 let project = LogicalProject::create(base, exprs);
471 Ok(project)
472 }
473 (
474 Some(window_size @ ExprImpl::Literal(_)),
475 Some(window_offset @ ExprImpl::Literal(_)),
476 None,
477 ) => {
478 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
479 for (idx, col_dt) in col_data_types.iter().enumerate() {
480 exprs.push(InputRef::new(idx, col_dt.clone()).into());
481 }
482 let window_start: ExprImpl = FunctionCall::new(
483 ExprType::TumbleStart,
484 vec![
485 ExprImpl::InputRef(Box::new(time_col)),
486 window_size.clone(),
487 window_offset,
488 ],
489 )?
490 .into();
491 let window_end =
495 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
496 .into();
497 exprs.push(window_start);
498 exprs.push(window_end);
499 let base = self.plan_relation(input)?;
500 let project = LogicalProject::create(base, exprs);
501 Ok(project)
502 }
503 _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
504 }
505 }
506
507 fn plan_hop_window(
508 &mut self,
509 input: Relation,
510 time_col: InputRef,
511 args: Vec<ExprImpl>,
512 ) -> Result<PlanRef> {
513 let input = self.plan_relation(input)?;
514 let mut args = args.into_iter();
515 let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
516 args.next_tuple()
517 else {
518 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
519 };
520
521 let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
522 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
523 };
524 let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
525 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
526 };
527
528 let window_offset = match (args.next(), args.next()) {
529 (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
530 Some(ScalarImpl::Interval(window_offset)) => window_offset,
531 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
532 },
533 (None, None) => Interval::from_month_day_usec(0, 0, 0),
534 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
535 };
536
537 if !window_size.is_positive() || !window_slide.is_positive() {
538 return Err(ErrorCode::BindError(format!(
539 "window_size {} and window_slide {} must be positive",
540 window_size, window_slide
541 ))
542 .into());
543 }
544
545 if window_size.exact_div(&window_slide).is_none() {
546 return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
547 }
548
549 Ok(LogicalHopWindow::create(
550 input,
551 time_col,
552 window_slide,
553 window_size,
554 window_offset,
555 ))
556 }
557}