1use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22 ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::types::{DataType, Interval, ScalarImpl};
25use risingwave_common::{bail, bail_not_implemented};
26use risingwave_sqlparser::ast::AsOf;
27
28use crate::binder::{
29 BoundBackCteRef, BoundBaseTable, BoundGapFill, BoundJoin, BoundShare, BoundShareInput,
30 BoundSource, BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation,
31 WindowTableFunctionKind,
32};
33use crate::error::{ErrorCode, Result};
34use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
35use crate::optimizer::plan_node::generic::SourceNodeKind;
36use crate::optimizer::plan_node::{
37 LogicalApply, LogicalCteRef, LogicalGapFill, LogicalHopWindow, LogicalJoin,
38 LogicalPlanRef as PlanRef, LogicalProject, LogicalScan, LogicalShare, LogicalSource,
39 LogicalSysScan, LogicalTableFunction, LogicalValues,
40};
41use crate::optimizer::property::Cardinality;
42use crate::planner::{PlanFor, Planner};
43use crate::utils::Condition;
44
45const ERROR_WINDOW_SIZE_ARG: &str =
46 "The size arg of window table function should be an interval literal.";
47
48impl Planner {
49 pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
50 match relation {
51 Relation::BaseTable(t) => self.plan_base_table(&t),
52 Relation::SystemTable(st) => self.plan_sys_table(*st),
53 Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
55 Relation::Join(join) => self.plan_join(*join),
56 Relation::Apply(join) => self.plan_apply(*join),
57 Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
58 Relation::Source(s) => self.plan_source(*s),
59 Relation::TableFunction {
60 expr: tf,
61 with_ordinality,
62 } => self.plan_table_function(tf, with_ordinality),
63 Relation::Watermark(tf) => self.plan_watermark(*tf),
64 Relation::Share(share) => self.plan_share(*share),
66 Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref),
67 Relation::GapFill(bound_gap_fill) => self.plan_gap_fill(*bound_gap_fill),
68 }
69 }
70
71 pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
72 Ok(LogicalSysScan::create(
73 sys_table.sys_table_catalog,
74 self.ctx(),
75 Cardinality::unknown(), )
77 .into())
78 }
79
80 pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
81 let as_of = base_table.as_of.clone();
82 let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
83
84 match base_table.table_catalog.engine {
85 Engine::Hummock => {
86 match as_of {
87 None
88 | Some(AsOf::ProcessTime)
89 | Some(AsOf::TimestampNum(_))
90 | Some(AsOf::TimestampString(_))
91 | Some(AsOf::ProcessTimeWithInterval(_)) => {}
92 Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
93 bail_not_implemented!("As Of Version is not supported yet.")
94 }
95 };
96 Ok(scan.into())
97 }
98 Engine::Iceberg => {
99 let is_append_only = base_table.table_catalog.append_only;
100 let use_iceberg_source = match (self.plan_for(), is_append_only) {
101 (PlanFor::StreamIcebergEngineInternal, _) => false,
102 (PlanFor::BatchDql, _) => true,
103 (PlanFor::Stream | PlanFor::Batch, is_append_only) => is_append_only,
104 };
105
106 if !use_iceberg_source {
107 match as_of {
108 None
109 | Some(AsOf::VersionNum(_))
110 | Some(AsOf::TimestampString(_))
111 | Some(AsOf::TimestampNum(_)) => {}
112 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
113 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
114 }
115 Some(AsOf::VersionString(_)) => {
116 bail_not_implemented!("As Of Version is not supported yet.")
117 }
118 }
119 Ok(scan.into())
120 } else {
121 match as_of {
122 None
123 | Some(AsOf::VersionNum(_))
124 | Some(AsOf::TimestampString(_))
125 | Some(AsOf::TimestampNum(_)) => {}
126 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
127 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
128 }
129 Some(AsOf::VersionString(_)) => {
130 bail_not_implemented!("As Of Version is not supported yet.")
131 }
132 }
133 let opt_ctx = self.ctx();
134 let session = opt_ctx.session_ctx();
135 let db_name = &session.database();
136 let catalog_reader = session.env().catalog_reader().read_guard();
137 let mut source_catalog = None;
138 for schema in catalog_reader.iter_schemas(db_name).unwrap() {
139 if schema
140 .get_table_by_id(&base_table.table_catalog.id)
141 .is_some()
142 {
143 source_catalog = schema.get_source_by_name(
144 &base_table.table_catalog.iceberg_source_name().unwrap(),
145 );
146 break;
147 }
148 }
149 if let Some(source_catalog) = source_catalog {
150 let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
151 .columns
152 .clone()
153 .into_iter()
154 .enumerate()
155 .map(|(i, column)| (column.name().to_owned(), (i, column)))
156 .collect();
157 let exprs = scan
158 .table()
159 .column_schema()
160 .fields()
161 .iter()
162 .map(|field| {
163 let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
164 RISINGWAVE_ICEBERG_ROW_ID
165 } else {
166 &field.name
167 };
168 if let Some((i, source_column)) = column_map.get(source_filed_name)
169 {
170 if source_column.column_desc.data_type == field.data_type {
171 ExprImpl::InputRef(
172 InputRef::new(*i, field.data_type.clone()).into(),
173 )
174 } else {
175 let mut input_ref = ExprImpl::InputRef(
176 InputRef::new(
177 *i,
178 source_column.column_desc.data_type.clone(),
179 )
180 .into(),
181 );
182 FunctionCall::cast_mut(
183 &mut input_ref,
184 &field.data_type(),
185 CastContext::Explicit,
186 )
187 .unwrap();
188 input_ref
189 }
190 } else {
191 ExprImpl::Literal(
193 Literal::new(None, field.data_type.clone()).into(),
194 )
195 }
196 })
197 .collect_vec();
198 let logical_source = LogicalSource::with_catalog(
199 Rc::new(source_catalog.deref().clone()),
200 SourceNodeKind::CreateMViewOrBatch,
201 self.ctx(),
202 as_of,
203 )?;
204 Ok(LogicalProject::new(logical_source.into(), exprs).into())
205 } else {
206 bail!(
207 "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
208 base_table.table_catalog.name()
209 );
210 }
211 }
212 }
213 }
214 }
215
216 pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
217 if source.is_shareable_cdc_connector() {
218 Err(ErrorCode::InternalError(
219 "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
220 )
221 .into())
222 } else {
223 let as_of = source.as_of.clone();
224 match as_of {
225 None
226 | Some(AsOf::VersionNum(_))
227 | Some(AsOf::TimestampString(_))
228 | Some(AsOf::TimestampNum(_)) => {}
229 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
230 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
231 }
232 Some(AsOf::VersionString(_)) => {
233 bail_not_implemented!("As Of Version is not supported yet.")
234 }
235 }
236
237 if matches!(self.plan_for(), PlanFor::Stream) {
240 let has_pk =
241 source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
242 if !has_pk {
243 let is_iceberg = source.catalog.is_iceberg_connector();
245 debug_assert!(is_iceberg);
247 if is_iceberg {
248 return Err(ErrorCode::BindError(format!(
249 "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
250 source.catalog
251 ))
252 .into());
253 } else {
254 return Err(ErrorCode::BindError(format!(
255 "Cannot create a stream job from a source without a primary key.
256This is a bug. We would appreciate a bug report at:
257https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
258
259source: {:?}",
260 source.catalog
261 ))
262 .into());
263 }
264 }
265 }
266 Ok(LogicalSource::with_catalog(
267 Rc::new(source.catalog),
268 SourceNodeKind::CreateMViewOrBatch,
269 self.ctx(),
270 as_of,
271 )?
272 .into())
273 }
274 }
275
276 pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
277 let left = self.plan_relation(join.left)?;
278 let right = self.plan_relation(join.right)?;
279 let join_type = join.join_type;
280 let on_clause = join.cond;
281 if on_clause.has_subquery() {
282 bail_not_implemented!("Subquery in join on condition");
283 } else {
284 Ok(LogicalJoin::create(left, right, join_type, on_clause))
285 }
286 }
287
288 pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
289 let join_type = join.join_type;
290 let on_clause = join.cond;
291 if on_clause.has_subquery() {
292 bail_not_implemented!("Subquery in join on condition");
293 }
294
295 let correlated_id = self.ctx.next_correlated_id();
296 let correlated_indices = join
297 .right
298 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
299 let left = self.plan_relation(join.left)?;
300 let right = self.plan_relation(join.right)?;
301
302 Ok(LogicalApply::create(
303 left,
304 right,
305 join_type,
306 Condition::with_expr(on_clause),
307 correlated_id,
308 correlated_indices,
309 false,
310 ))
311 }
312
313 pub(super) fn plan_window_table_function(
314 &mut self,
315 table_function: BoundWindowTableFunction,
316 ) -> Result<PlanRef> {
317 use WindowTableFunctionKind::*;
318 match table_function.kind {
319 Tumble => self.plan_tumble_window(
320 table_function.input,
321 table_function.time_col,
322 table_function.args,
323 ),
324 Hop => self.plan_hop_window(
325 table_function.input,
326 table_function.time_col,
327 table_function.args,
328 ),
329 }
330 }
331
332 pub(super) fn plan_table_function(
333 &mut self,
334 table_function: ExprImpl,
335 with_ordinality: bool,
336 ) -> Result<PlanRef> {
337 match table_function {
339 ExprImpl::TableFunction(tf) => {
340 Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
341 }
342 expr => {
343 let schema = Schema {
344 fields: vec![Field::unnamed(expr.return_type())],
346 };
347 let expr_return_type = expr.return_type();
348 let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
349 let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
350 let mut exprs = if let DataType::Struct(st) = expr_return_type {
351 st.iter()
352 .enumerate()
353 .map(|(i, (_, ty))| {
354 let idx = ExprImpl::literal_int(i.try_into().unwrap());
355 let args = vec![input_ref.clone(), idx];
356 FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
357 })
358 .collect()
359 } else {
360 vec![input_ref]
361 };
362 if with_ordinality {
363 exprs.push(ExprImpl::literal_bigint(1));
364 }
365 Ok(LogicalProject::create(root, exprs))
366 }
367 }
368 }
369
370 pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
371 match share.input {
372 BoundShareInput::Query(Either::Left(nonrecursive_query)) => {
373 let id = share.share_id;
374 match self.share_cache.get(&id) {
375 None => {
376 let result = self
377 .plan_query(nonrecursive_query)?
378 .into_unordered_subplan();
379 let logical_share = LogicalShare::create(result);
380 self.share_cache.insert(id, logical_share.clone());
381 Ok(logical_share)
382 }
383 Some(result) => Ok(result.clone()),
384 }
385 }
386 BoundShareInput::Query(Either::Right(recursive_union)) => self.plan_recursive_union(
388 *recursive_union.base,
389 *recursive_union.recursive,
390 share.share_id,
391 ),
392 BoundShareInput::ChangeLog(relation) => {
393 let id = share.share_id;
394 let result = self.plan_changelog(relation)?;
395 let logical_share = LogicalShare::create(result);
396 self.share_cache.insert(id, logical_share.clone());
397 Ok(logical_share)
398 }
399 }
400 }
401
402 pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
403 todo!("plan watermark");
404 }
405
406 pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result<PlanRef> {
407 let base = self.plan_set_expr(cte_ref.base, vec![], &[])?;
409 Ok(LogicalCteRef::create(cte_ref.share_id, base))
410 }
411
412 pub(super) fn plan_gap_fill(&mut self, gap_fill: BoundGapFill) -> Result<PlanRef> {
413 let input = self.plan_relation(gap_fill.input)?;
414 Ok(LogicalGapFill::new(
415 input,
416 gap_fill.time_col,
417 gap_fill.interval,
418 gap_fill.fill_strategies,
419 )
420 .into())
421 }
422
423 fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
424 let col_data_types = match relation {
425 Relation::Source(s) => s
426 .catalog
427 .columns
428 .iter()
429 .map(|col| col.data_type().clone())
430 .collect(),
431 Relation::BaseTable(t) => t
432 .table_catalog
433 .columns
434 .iter()
435 .map(|col| col.data_type().clone())
436 .collect(),
437 Relation::Subquery(q) => q.query.schema().data_types(),
438 Relation::Share(share) => share
439 .input
440 .fields()?
441 .into_iter()
442 .map(|(_, f)| f.data_type)
443 .collect(),
444 r => {
445 return Err(ErrorCode::BindError(format!(
446 "Invalid input relation to tumble: {r:?}"
447 ))
448 .into());
449 }
450 };
451 Ok(col_data_types)
452 }
453
454 fn plan_tumble_window(
455 &mut self,
456 input: Relation,
457 time_col: InputRef,
458 args: Vec<ExprImpl>,
459 ) -> Result<PlanRef> {
460 let mut args = args.into_iter();
461 let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
462
463 match (args.next(), args.next(), args.next()) {
464 (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
465 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
466 for (idx, col_dt) in col_data_types.iter().enumerate() {
467 exprs.push(InputRef::new(idx, col_dt.clone()).into());
468 }
469 let window_start: ExprImpl = FunctionCall::new(
470 ExprType::TumbleStart,
471 vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
472 )?
473 .into();
474 let window_end =
478 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
479 .into();
480 exprs.push(window_start);
481 exprs.push(window_end);
482 let base = self.plan_relation(input)?;
483 let project = LogicalProject::create(base, exprs);
484 Ok(project)
485 }
486 (
487 Some(window_size @ ExprImpl::Literal(_)),
488 Some(window_offset @ ExprImpl::Literal(_)),
489 None,
490 ) => {
491 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
492 for (idx, col_dt) in col_data_types.iter().enumerate() {
493 exprs.push(InputRef::new(idx, col_dt.clone()).into());
494 }
495 let window_start: ExprImpl = FunctionCall::new(
496 ExprType::TumbleStart,
497 vec![
498 ExprImpl::InputRef(Box::new(time_col)),
499 window_size.clone(),
500 window_offset,
501 ],
502 )?
503 .into();
504 let window_end =
508 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
509 .into();
510 exprs.push(window_start);
511 exprs.push(window_end);
512 let base = self.plan_relation(input)?;
513 let project = LogicalProject::create(base, exprs);
514 Ok(project)
515 }
516 _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
517 }
518 }
519
520 fn plan_hop_window(
521 &mut self,
522 input: Relation,
523 time_col: InputRef,
524 args: Vec<ExprImpl>,
525 ) -> Result<PlanRef> {
526 let input = self.plan_relation(input)?;
527 let mut args = args.into_iter();
528 let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
529 args.next_tuple()
530 else {
531 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
532 };
533
534 let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
535 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
536 };
537 let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
538 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
539 };
540
541 let window_offset = match (args.next(), args.next()) {
542 (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
543 Some(ScalarImpl::Interval(window_offset)) => window_offset,
544 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
545 },
546 (None, None) => Interval::from_month_day_usec(0, 0, 0),
547 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
548 };
549
550 if !window_size.is_positive() || !window_slide.is_positive() {
551 return Err(ErrorCode::BindError(format!(
552 "window_size {} and window_slide {} must be positive",
553 window_size, window_slide
554 ))
555 .into());
556 }
557
558 if window_size.exact_div(&window_slide).is_none() {
559 return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
560 }
561
562 Ok(LogicalHopWindow::create(
563 input,
564 time_col,
565 window_slide,
566 window_size,
567 window_offset,
568 ))
569 }
570}