1use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{
21 ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
22};
23use risingwave_common::types::{DataType, Interval, ScalarImpl};
24use risingwave_common::{bail, bail_not_implemented};
25use risingwave_sqlparser::ast::AsOf;
26
27use crate::binder::{
28 BoundBaseTable, BoundGapFill, BoundJoin, BoundShare, BoundShareInput, BoundSource,
29 BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
30};
31use crate::error::{ErrorCode, Result};
32use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
33use crate::optimizer::plan_node::generic::SourceNodeKind;
34use crate::optimizer::plan_node::{
35 LogicalApply, LogicalGapFill, LogicalHopWindow, LogicalJoin, LogicalPlanRef as PlanRef,
36 LogicalProject, LogicalScan, LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction,
37 LogicalValues,
38};
39use crate::optimizer::property::Cardinality;
40use crate::planner::{PlanFor, Planner};
41use crate::utils::Condition;
42
43const ERROR_WINDOW_SIZE_ARG: &str =
44 "The size arg of window table function should be an interval literal.";
45
46impl Planner {
47 pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
48 match relation {
49 Relation::BaseTable(t) => self.plan_base_table(&t),
50 Relation::SystemTable(st) => self.plan_sys_table(*st),
51 Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
53 Relation::Join(join) => self.plan_join(*join),
54 Relation::Apply(join) => self.plan_apply(*join),
55 Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
56 Relation::Source(s) => self.plan_source(*s),
57 Relation::TableFunction {
58 expr: tf,
59 with_ordinality,
60 } => self.plan_table_function(tf, with_ordinality),
61 Relation::Watermark(tf) => self.plan_watermark(*tf),
62 Relation::Share(share) => self.plan_share(*share),
63 Relation::GapFill(bound_gap_fill) => self.plan_gap_fill(*bound_gap_fill),
64 }
65 }
66
67 pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
68 Ok(LogicalSysScan::create(
69 sys_table.sys_table_catalog,
70 self.ctx(),
71 Cardinality::unknown(), )
73 .into())
74 }
75
76 pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
77 let as_of = base_table.as_of.clone();
78 let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
79
80 match base_table.table_catalog.engine {
81 Engine::Hummock => {
82 match as_of {
83 None
84 | Some(AsOf::ProcessTime)
85 | Some(AsOf::TimestampNum(_))
86 | Some(AsOf::TimestampString(_))
87 | Some(AsOf::ProcessTimeWithInterval(_)) => {}
88 Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
89 bail_not_implemented!("As Of Version is not supported yet.")
90 }
91 };
92 Ok(scan.into())
93 }
94 Engine::Iceberg => {
95 let is_append_only = base_table.table_catalog.append_only;
96 let use_iceberg_source = match (self.plan_for(), is_append_only) {
97 (PlanFor::StreamIcebergEngineInternal, _) => false,
98 (PlanFor::BatchDql, _) => true,
99 (PlanFor::Stream | PlanFor::Batch, is_append_only) => is_append_only,
100 };
101
102 if !use_iceberg_source {
103 match as_of {
104 None
105 | Some(AsOf::VersionNum(_))
106 | Some(AsOf::TimestampString(_))
107 | Some(AsOf::TimestampNum(_)) => {}
108 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
109 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
110 }
111 Some(AsOf::VersionString(_)) => {
112 bail_not_implemented!("As Of Version is not supported yet.")
113 }
114 }
115 Ok(scan.into())
116 } else {
117 match as_of {
118 None
119 | Some(AsOf::VersionNum(_))
120 | Some(AsOf::TimestampString(_))
121 | Some(AsOf::TimestampNum(_)) => {}
122 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
123 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
124 }
125 Some(AsOf::VersionString(_)) => {
126 bail_not_implemented!("As Of Version is not supported yet.")
127 }
128 }
129 let opt_ctx = self.ctx();
130 let session = opt_ctx.session_ctx();
131 let db_name = &session.database();
132 let catalog_reader = session.env().catalog_reader().read_guard();
133 let mut source_catalog = None;
134 for schema in catalog_reader.iter_schemas(db_name).unwrap() {
135 if schema
136 .get_table_by_id(base_table.table_catalog.id)
137 .is_some()
138 {
139 source_catalog = schema.get_source_by_name(
140 &base_table.table_catalog.iceberg_source_name().unwrap(),
141 );
142 break;
143 }
144 }
145 if let Some(source_catalog) = source_catalog {
146 let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
147 .columns
148 .clone()
149 .into_iter()
150 .enumerate()
151 .map(|(i, column)| (column.name().to_owned(), (i, column)))
152 .collect();
153 let exprs = scan
154 .table()
155 .column_schema()
156 .fields()
157 .iter()
158 .map(|field| {
159 let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
160 RISINGWAVE_ICEBERG_ROW_ID
161 } else {
162 &field.name
163 };
164 if let Some((i, source_column)) = column_map.get(source_filed_name)
165 {
166 if source_column.column_desc.data_type == field.data_type {
167 ExprImpl::InputRef(
168 InputRef::new(*i, field.data_type.clone()).into(),
169 )
170 } else {
171 let mut input_ref = ExprImpl::InputRef(
172 InputRef::new(
173 *i,
174 source_column.column_desc.data_type.clone(),
175 )
176 .into(),
177 );
178 FunctionCall::cast_mut(
179 &mut input_ref,
180 &field.data_type(),
181 CastContext::Explicit,
182 )
183 .unwrap();
184 input_ref
185 }
186 } else {
187 ExprImpl::Literal(
189 Literal::new(None, field.data_type.clone()).into(),
190 )
191 }
192 })
193 .collect_vec();
194 let logical_source = LogicalSource::with_catalog(
195 Rc::new(source_catalog.deref().clone()),
196 SourceNodeKind::CreateMViewOrBatch,
197 self.ctx(),
198 as_of,
199 )?;
200 Ok(LogicalProject::new(logical_source.into(), exprs).into())
201 } else {
202 bail!(
203 "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
204 base_table.table_catalog.name()
205 );
206 }
207 }
208 }
209 }
210 }
211
212 pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
213 if source.is_shareable_cdc_connector() {
214 Err(ErrorCode::InternalError(
215 "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
216 )
217 .into())
218 } else {
219 let as_of = source.as_of.clone();
220 match as_of {
221 None
222 | Some(AsOf::VersionNum(_))
223 | Some(AsOf::TimestampString(_))
224 | Some(AsOf::TimestampNum(_)) => {}
225 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
226 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
227 }
228 Some(AsOf::VersionString(_)) => {
229 bail_not_implemented!("As Of Version is not supported yet.")
230 }
231 }
232
233 if matches!(self.plan_for(), PlanFor::Stream) {
236 let has_pk =
237 source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
238 if !has_pk {
239 let is_iceberg = source.catalog.is_iceberg_connector();
241 debug_assert!(is_iceberg);
243 if is_iceberg {
244 return Err(ErrorCode::BindError(format!(
245 "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
246 source.catalog
247 ))
248 .into());
249 } else {
250 return Err(ErrorCode::BindError(format!(
251 "Cannot create a stream job from a source without a primary key.
252This is a bug. We would appreciate a bug report at:
253https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
254
255source: {:?}",
256 source.catalog
257 ))
258 .into());
259 }
260 }
261 }
262 Ok(LogicalSource::with_catalog(
263 Rc::new(source.catalog),
264 SourceNodeKind::CreateMViewOrBatch,
265 self.ctx(),
266 as_of,
267 )?
268 .into())
269 }
270 }
271
272 pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
273 let left = self.plan_relation(join.left)?;
274 let right = self.plan_relation(join.right)?;
275 let join_type = join.join_type;
276 let on_clause = join.cond;
277 if on_clause.has_subquery() {
278 bail_not_implemented!("Subquery in join on condition");
279 } else {
280 Ok(LogicalJoin::create(left, right, join_type, on_clause))
281 }
282 }
283
284 pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
285 let join_type = join.join_type;
286 let on_clause = join.cond;
287 if on_clause.has_subquery() {
288 bail_not_implemented!("Subquery in join on condition");
289 }
290
291 let correlated_id = self.ctx.next_correlated_id();
292 let correlated_indices = join
293 .right
294 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
295 let left = self.plan_relation(join.left)?;
296 let right = self.plan_relation(join.right)?;
297
298 Ok(LogicalApply::create(
299 left,
300 right,
301 join_type,
302 Condition::with_expr(on_clause),
303 correlated_id,
304 correlated_indices,
305 false,
306 ))
307 }
308
309 pub(super) fn plan_window_table_function(
310 &mut self,
311 table_function: BoundWindowTableFunction,
312 ) -> Result<PlanRef> {
313 use WindowTableFunctionKind::*;
314 match table_function.kind {
315 Tumble => self.plan_tumble_window(
316 table_function.input,
317 table_function.time_col,
318 table_function.args,
319 ),
320 Hop => self.plan_hop_window(
321 table_function.input,
322 table_function.time_col,
323 table_function.args,
324 ),
325 }
326 }
327
328 pub(super) fn plan_table_function(
329 &mut self,
330 table_function: ExprImpl,
331 with_ordinality: bool,
332 ) -> Result<PlanRef> {
333 match table_function {
335 ExprImpl::TableFunction(tf) => {
336 Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
337 }
338 expr => {
339 let schema = Schema {
340 fields: vec![Field::unnamed(expr.return_type())],
342 };
343 let expr_return_type = expr.return_type();
344 let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
345 let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
346 let mut exprs = if let DataType::Struct(st) = expr_return_type {
347 st.iter()
348 .enumerate()
349 .map(|(i, (_, ty))| {
350 let idx = ExprImpl::literal_int(i.try_into().unwrap());
351 let args = vec![input_ref.clone(), idx];
352 FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
353 })
354 .collect()
355 } else {
356 vec![input_ref]
357 };
358 if with_ordinality {
359 exprs.push(ExprImpl::literal_bigint(1));
360 }
361 Ok(LogicalProject::create(root, exprs))
362 }
363 }
364 }
365
366 pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
367 match share.input {
368 BoundShareInput::Query(query) => {
369 let id = share.share_id;
370 match self.share_cache.get(&id) {
371 None => {
372 let result = self.plan_query(query)?.into_unordered_subplan();
373 let logical_share = LogicalShare::create(result);
374 self.share_cache.insert(id, logical_share.clone());
375 Ok(logical_share)
376 }
377 Some(result) => Ok(result.clone()),
378 }
379 }
380 BoundShareInput::ChangeLog(relation) => {
381 let id = share.share_id;
382 let result = self.plan_changelog(relation)?;
383 let logical_share = LogicalShare::create(result);
384 self.share_cache.insert(id, logical_share.clone());
385 Ok(logical_share)
386 }
387 }
388 }
389
390 pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
391 todo!("plan watermark");
392 }
393
394 pub(super) fn plan_gap_fill(&mut self, gap_fill: BoundGapFill) -> Result<PlanRef> {
395 let input = self.plan_relation(gap_fill.input)?;
396 Ok(LogicalGapFill::new(
397 input,
398 gap_fill.time_col,
399 gap_fill.interval,
400 gap_fill.fill_strategies,
401 )
402 .into())
403 }
404
405 fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
406 let col_data_types = match relation {
407 Relation::Source(s) => s
408 .catalog
409 .columns
410 .iter()
411 .map(|col| col.data_type().clone())
412 .collect(),
413 Relation::BaseTable(t) => t
414 .table_catalog
415 .columns
416 .iter()
417 .map(|col| col.data_type().clone())
418 .collect(),
419 Relation::Subquery(q) => q.query.schema().data_types(),
420 Relation::Share(share) => share
421 .input
422 .fields()?
423 .into_iter()
424 .map(|(_, f)| f.data_type)
425 .collect(),
426 r => {
427 return Err(ErrorCode::BindError(format!(
428 "Invalid input relation to tumble: {r:?}"
429 ))
430 .into());
431 }
432 };
433 Ok(col_data_types)
434 }
435
436 fn plan_tumble_window(
437 &mut self,
438 input: Relation,
439 time_col: InputRef,
440 args: Vec<ExprImpl>,
441 ) -> Result<PlanRef> {
442 let mut args = args.into_iter();
443 let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
444
445 match (args.next(), args.next(), args.next()) {
446 (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
447 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
448 for (idx, col_dt) in col_data_types.iter().enumerate() {
449 exprs.push(InputRef::new(idx, col_dt.clone()).into());
450 }
451 let window_start: ExprImpl = FunctionCall::new(
452 ExprType::TumbleStart,
453 vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
454 )?
455 .into();
456 let window_end =
460 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
461 .into();
462 exprs.push(window_start);
463 exprs.push(window_end);
464 let base = self.plan_relation(input)?;
465 let project = LogicalProject::create(base, exprs);
466 Ok(project)
467 }
468 (
469 Some(window_size @ ExprImpl::Literal(_)),
470 Some(window_offset @ ExprImpl::Literal(_)),
471 None,
472 ) => {
473 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
474 for (idx, col_dt) in col_data_types.iter().enumerate() {
475 exprs.push(InputRef::new(idx, col_dt.clone()).into());
476 }
477 let window_start: ExprImpl = FunctionCall::new(
478 ExprType::TumbleStart,
479 vec![
480 ExprImpl::InputRef(Box::new(time_col)),
481 window_size.clone(),
482 window_offset,
483 ],
484 )?
485 .into();
486 let window_end =
490 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
491 .into();
492 exprs.push(window_start);
493 exprs.push(window_end);
494 let base = self.plan_relation(input)?;
495 let project = LogicalProject::create(base, exprs);
496 Ok(project)
497 }
498 _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
499 }
500 }
501
502 fn plan_hop_window(
503 &mut self,
504 input: Relation,
505 time_col: InputRef,
506 args: Vec<ExprImpl>,
507 ) -> Result<PlanRef> {
508 let input = self.plan_relation(input)?;
509 let mut args = args.into_iter();
510 let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
511 args.next_tuple()
512 else {
513 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
514 };
515
516 let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
517 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
518 };
519 let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
520 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
521 };
522
523 let window_offset = match (args.next(), args.next()) {
524 (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
525 Some(ScalarImpl::Interval(window_offset)) => window_offset,
526 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
527 },
528 (None, None) => Interval::from_month_day_usec(0, 0, 0),
529 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
530 };
531
532 if !window_size.is_positive() || !window_slide.is_positive() {
533 return Err(ErrorCode::BindError(format!(
534 "window_size {} and window_slide {} must be positive",
535 window_size, window_slide
536 ))
537 .into());
538 }
539
540 if window_size.exact_div(&window_slide).is_none() {
541 return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
542 }
543
544 Ok(LogicalHopWindow::create(
545 input,
546 time_col,
547 window_slide,
548 window_size,
549 window_offset,
550 ))
551 }
552}