1use std::collections::HashMap;
16use std::default::Default;
17use std::ops::Bound;
18use std::vec;
19
20use anyhow::anyhow;
21use chrono::{MappedLocalTime, TimeZone};
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
25use risingwave_common::catalog::{
26 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, FieldDisplay,
27 OBJECT_ID_PLACEHOLDER, Schema, StreamJobStatus,
28};
29use risingwave_common::constants::log_store::v2::{
30 KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
31};
32use risingwave_common::hash::VnodeCount;
33use risingwave_common::license::Feature;
34use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamptz};
35use risingwave_common::util::scan_range::{ScanRange, is_full_range};
36use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
37use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
38use risingwave_expr::aggregate::PbAggKind;
39use risingwave_expr::bail;
40use risingwave_pb::plan_common::as_of::AsOfType;
41use risingwave_pb::plan_common::{PbAsOf, as_of};
42use risingwave_sqlparser::ast::AsOf;
43
44use super::generic::{self, GenericPlanRef, PhysicalPlanRef};
45use super::pretty_config;
46use crate::PlanRef;
47use crate::catalog::table_catalog::TableType;
48use crate::catalog::{ColumnId, TableCatalog, TableId};
49use crate::error::{ErrorCode, Result};
50use crate::expr::InputRef;
51use crate::optimizer::StreamScanType;
52use crate::optimizer::plan_node::generic::Agg;
53use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
54use crate::optimizer::property::{Cardinality, Order, RequiredDist, WatermarkColumns};
55use crate::utils::{Condition, IndexSet};
56
57#[derive(Default)]
58pub struct TableCatalogBuilder {
59 columns: Vec<ColumnCatalog>,
61 pk: Vec<ColumnOrder>,
62 value_indices: Option<Vec<usize>>,
63 vnode_col_idx: Option<usize>,
64 column_names: HashMap<String, i32>,
65 watermark_columns: Option<FixedBitSet>,
66 dist_key_in_pk: Option<Vec<usize>>,
67}
68
69impl TableCatalogBuilder {
72 pub fn add_column(&mut self, field: &Field) -> usize {
74 let column_idx = self.columns.len();
75 let column_id = column_idx as i32;
76 let mut column_desc = ColumnDesc::from_field_with_column_id(field, column_id);
78
79 column_desc.name = column_desc.name.replace('.', "_");
81 self.avoid_duplicate_col_name(&mut column_desc);
83
84 self.columns.push(ColumnCatalog {
85 column_desc,
86 is_hidden: false,
88 });
89 column_idx
90 }
91
92 pub fn extend_columns(&mut self, columns: &[ColumnCatalog]) -> Vec<usize> {
96 let base_idx = self.columns.len();
97 columns.iter().enumerate().for_each(|(i, col)| {
98 assert!(!self.column_names.contains_key(col.name()));
99 self.column_names.insert(col.name().to_owned(), 0);
100
101 let mut new_col = col.clone();
103 new_col.column_desc.column_id = ColumnId::new((base_idx + i) as _);
104 self.columns.push(new_col);
105 });
106 Vec::from_iter(base_idx..(base_idx + columns.len()))
107 }
108
109 pub fn add_order_column(&mut self, column_index: usize, order_type: OrderType) {
112 self.pk.push(ColumnOrder::new(column_index, order_type));
113 }
114
115 pub fn get_current_pk_len(&self) -> usize {
117 self.pk.len()
118 }
119
120 pub fn set_vnode_col_idx(&mut self, vnode_col_idx: usize) {
121 self.vnode_col_idx = Some(vnode_col_idx);
122 }
123
124 pub fn set_value_indices(&mut self, value_indices: Vec<usize>) {
125 self.value_indices = Some(value_indices);
126 }
127
128 pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec<usize>) {
129 self.dist_key_in_pk = Some(dist_key_in_pk);
130 }
131
132 fn avoid_duplicate_col_name(&mut self, column_desc: &mut ColumnDesc) {
135 if let Some(old_identity) = self.column_names.get(&column_desc.name) {
136 let column_name = column_desc.name.clone();
137 let mut identity = *old_identity;
138 loop {
139 column_desc.name = format!("{}_{}", column_name, identity);
140 identity += 1;
141 if !self.column_names.contains_key(&column_desc.name) {
142 break;
143 }
144 }
145 *self.column_names.get_mut(&column_name).unwrap() = identity;
146 }
147 self.column_names.insert(column_desc.name.clone(), 0);
148 }
149
150 pub fn build(self, distribution_key: Vec<usize>, read_prefix_len_hint: usize) -> TableCatalog {
154 assert!(read_prefix_len_hint <= self.pk.len());
155 let watermark_columns = match self.watermark_columns {
156 Some(w) => w,
157 None => FixedBitSet::with_capacity(self.columns.len()),
158 };
159
160 if let Some(dist_key_in_pk) = &self.dist_key_in_pk {
164 let derived_dist_key = dist_key_in_pk
165 .iter()
166 .map(|idx| self.pk[*idx].column_index)
167 .collect_vec();
168 assert_eq!(
169 derived_dist_key, distribution_key,
170 "dist_key mismatch with dist_key_in_pk"
171 );
172 }
173
174 TableCatalog {
175 id: TableId::placeholder(),
176 schema_id: 0,
177 database_id: 0,
178 associated_source_id: None,
179 name: String::new(),
180 dependent_relations: vec![],
181 columns: self.columns.clone(),
182 pk: self.pk,
183 stream_key: vec![],
184 distribution_key,
185 table_type: TableType::Internal,
188 append_only: false,
189 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
190 fragment_id: OBJECT_ID_PLACEHOLDER,
191 dml_fragment_id: None,
192 vnode_col_index: self.vnode_col_idx,
193 row_id_index: None,
194 value_indices: self
195 .value_indices
196 .unwrap_or_else(|| (0..self.columns.len()).collect_vec()),
197 definition: "".into(),
198 conflict_behavior: ConflictBehavior::NoCheck,
199 version_column_index: None,
200 read_prefix_len_hint,
201 version: None, watermark_columns,
203 dist_key_in_pk: self.dist_key_in_pk.unwrap_or_default(),
204 cardinality: Cardinality::unknown(), created_at_epoch: None,
206 initialized_at_epoch: None,
207 cleaned_by_watermark: false,
208 create_type: CreateType::Foreground,
211 stream_job_status: StreamJobStatus::Creating,
212 description: None,
213 incoming_sinks: vec![],
214 initialized_at_cluster_version: None,
215 created_at_cluster_version: None,
216 retention_seconds: None,
217 cdc_table_id: None,
218 vnode_count: VnodeCount::Placeholder, webhook_info: None,
220 job_id: None,
221 engine: Engine::Hummock,
222 clean_watermark_index_in_pk: None, }
224 }
225
226 pub fn columns(&self) -> &[ColumnCatalog] {
227 &self.columns
228 }
229}
230
231pub trait Distill {
233 fn distill<'a>(&self) -> XmlNode<'a>;
234
235 fn distill_to_string(&self) -> String {
236 let mut config = pretty_config();
237 let mut output = String::with_capacity(2048);
238 config.unicode(&mut output, &Pretty::Record(self.distill()));
239 output
240 }
241}
242
243pub(super) fn childless_record<'a>(
244 name: impl Into<Str<'a>>,
245 fields: StrAssocArr<'a>,
246) -> XmlNode<'a> {
247 XmlNode::simple_record(name, fields, Default::default())
248}
249
250macro_rules! impl_distill_by_unit {
251 ($ty:ty, $core:ident, $name:expr) => {
252 use pretty_xmlish::XmlNode;
253 use $crate::optimizer::plan_node::generic::DistillUnit;
254 use $crate::optimizer::plan_node::utils::Distill;
255 impl Distill for $ty {
256 fn distill<'a>(&self) -> XmlNode<'a> {
257 self.$core.distill_with_name($name)
258 }
259 }
260 };
261}
262pub(crate) use impl_distill_by_unit;
263
264pub(crate) fn column_names_pretty<'a>(schema: &Schema) -> Pretty<'a> {
265 let columns = (schema.fields.iter())
266 .map(|f| f.name.clone())
267 .map(Pretty::from)
268 .collect();
269 Pretty::Array(columns)
270}
271
272pub(crate) fn watermark_pretty<'a>(
273 watermark_columns: &WatermarkColumns,
274 schema: &Schema,
275) -> Option<Pretty<'a>> {
276 if watermark_columns.is_empty() {
277 None
278 } else {
279 let groups = watermark_columns.grouped();
280 let pretty_groups = groups
281 .values()
282 .map(|cols| {
283 Pretty::Array(
284 cols.indices()
285 .map(|idx| FieldDisplay(schema.fields.get(idx).unwrap()))
286 .map(|d| Pretty::display(&d))
287 .collect::<Vec<_>>(),
288 )
289 })
290 .collect();
291 Some(Pretty::Array(pretty_groups))
292 }
293}
294
295#[derive(Clone, Copy)]
296pub struct IndicesDisplay<'a> {
297 pub indices: &'a [usize],
298 pub schema: &'a Schema,
299}
300
301impl<'a> IndicesDisplay<'a> {
302 pub fn from_join<'b, PlanRef: GenericPlanRef>(
304 join: &'a generic::Join<PlanRef>,
305 input_schema: &'a Schema,
306 ) -> Pretty<'b> {
307 let col_num = join.internal_column_num();
308 let id = Self::from(&join.output_indices, col_num, input_schema);
309 id.map_or_else(|| Pretty::from("all"), Self::distill)
310 }
311
312 fn from(indices: &'a [usize], col_num: usize, schema: &'a Schema) -> Option<Self> {
314 if indices.iter().copied().eq(0..col_num) {
315 return None;
316 }
317 Some(Self { indices, schema })
318 }
319
320 pub fn distill<'b>(self) -> Pretty<'b> {
321 let vec = self.indices.iter().map(|&i| {
322 let name = self.schema.fields.get(i).unwrap().name.clone();
323 Pretty::from(name)
324 });
325 Pretty::Array(vec.collect())
326 }
327}
328
329pub(crate) fn sum_affected_row(dml: PlanRef) -> Result<PlanRef> {
330 let dml = RequiredDist::single().enforce_if_not_satisfies(dml, &Order::any())?;
331 let sum_agg = PlanAggCall {
333 agg_type: PbAggKind::Sum.into(),
334 return_type: DataType::Int64,
335 inputs: vec![InputRef::new(0, DataType::Int64)],
336 distinct: false,
337 order_by: vec![],
338 filter: Condition::true_cond(),
339 direct_args: vec![],
340 };
341 let agg = Agg::new(vec![sum_agg], IndexSet::empty(), dml);
342 let batch_agg = BatchSimpleAgg::new(agg);
343 Ok(batch_agg.into())
344}
345
346macro_rules! plan_node_name {
350 ($name:literal $(, { $prop:literal, $cond:expr } )* $(,)?) => {
351 {
352 #[allow(unused_mut)]
353 let mut properties: Vec<&str> = vec![];
354 $( if $cond { properties.push($prop); } )*
355 let mut name = $name.to_string();
356 if !properties.is_empty() {
357 name += " [";
358 name += &properties.join(", ");
359 name += "]";
360 }
361 name
362 }
363 };
364}
365pub(crate) use plan_node_name;
366
367pub fn infer_kv_log_store_table_catalog_inner(
368 input: &PlanRef,
369 columns: &[ColumnCatalog],
370) -> TableCatalog {
371 let mut table_catalog_builder = TableCatalogBuilder::default();
372
373 let mut value_indices =
374 Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());
375
376 for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
377 let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
378 value_indices.push(indice);
379 }
380
381 table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
382
383 for (i, ordering) in PK_ORDERING.iter().enumerate() {
384 table_catalog_builder.add_order_column(i, *ordering);
385 }
386
387 let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
388
389 let payload_indices = table_catalog_builder.extend_columns(columns);
390
391 value_indices.extend(payload_indices);
392 table_catalog_builder.set_value_indices(value_indices);
393
394 let dist_key = input
396 .distribution()
397 .dist_column_indices()
398 .iter()
399 .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
400 .collect_vec();
401
402 table_catalog_builder.build(dist_key, read_prefix_len_hint)
403}
404
405pub fn infer_synced_kv_log_store_table_catalog_inner(
406 input: &PlanRef,
407 columns: &[Field],
408) -> TableCatalog {
409 let mut table_catalog_builder = TableCatalogBuilder::default();
410
411 let mut value_indices =
412 Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());
413
414 for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
415 let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
416 value_indices.push(indice);
417 }
418
419 table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
420
421 for (i, ordering) in PK_ORDERING.iter().enumerate() {
422 table_catalog_builder.add_order_column(i, *ordering);
423 }
424
425 let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
426
427 let payload_indices = {
428 let mut payload_indices = Vec::with_capacity(columns.len());
429 for column in columns {
430 let payload_index = table_catalog_builder.add_column(column);
431 payload_indices.push(payload_index);
432 }
433 payload_indices
434 };
435
436 value_indices.extend(payload_indices);
437 table_catalog_builder.set_value_indices(value_indices);
438
439 let dist_key = input
441 .distribution()
442 .dist_column_indices()
443 .iter()
444 .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
445 .collect_vec();
446
447 table_catalog_builder.build(dist_key, read_prefix_len_hint)
448}
449
450pub(crate) fn plan_can_use_background_ddl(plan: &PlanRef) -> bool {
455 if plan.inputs().is_empty() {
456 if plan.as_stream_source_scan().is_some()
457 || plan.as_stream_now().is_some()
458 || plan.as_stream_source().is_some()
459 {
460 true
461 } else if let Some(scan) = plan.as_stream_table_scan() {
462 scan.stream_scan_type() == StreamScanType::Backfill
463 || scan.stream_scan_type() == StreamScanType::ArrangementBackfill
464 || scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill
465 } else {
466 false
467 }
468 } else {
469 assert!(!plan.inputs().is_empty());
470 plan.inputs().iter().all(plan_can_use_background_ddl)
471 }
472}
473
474pub fn to_pb_time_travel_as_of(a: &Option<AsOf>) -> Result<Option<PbAsOf>> {
475 let Some(a) = a else {
476 return Ok(None);
477 };
478 Feature::TimeTravel
479 .check_available()
480 .map_err(|e| anyhow::anyhow!(e))?;
481 let as_of_type = match a {
482 AsOf::ProcessTime => {
483 return Err(ErrorCode::NotSupported(
484 "do not support as of proctime".to_owned(),
485 "please use as of timestamp".to_owned(),
486 )
487 .into());
488 }
489 AsOf::TimestampNum(ts) => AsOfType::Timestamp(as_of::Timestamp { timestamp: *ts }),
490 AsOf::TimestampString(ts) => {
491 let date_time = speedate::DateTime::parse_str_rfc3339(ts)
492 .map_err(|_e| anyhow!("fail to parse timestamp"))?;
493 let timestamp = if date_time.time.tz_offset.is_none() {
494 risingwave_expr::expr_context::TIME_ZONE::try_with(|set_time_zone| {
496 let tz =
497 Timestamptz::lookup_time_zone(set_time_zone).map_err(|e| anyhow!(e))?;
498 match tz.with_ymd_and_hms(
499 date_time.date.year.into(),
500 date_time.date.month.into(),
501 date_time.date.day.into(),
502 date_time.time.hour.into(),
503 date_time.time.minute.into(),
504 date_time.time.second.into(),
505 ) {
506 MappedLocalTime::Single(d) => Ok(d.timestamp()),
507 MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
508 Err(anyhow!(format!(
509 "failed to parse the timestamp {ts} with the specified time zone {tz}"
510 )))
511 }
512 }
513 })??
514 } else {
515 date_time.timestamp_tz()
516 };
517 AsOfType::Timestamp(as_of::Timestamp { timestamp })
518 }
519 AsOf::VersionNum(_) | AsOf::VersionString(_) => {
520 return Err(ErrorCode::NotSupported(
521 "do not support as of version".to_owned(),
522 "please use as of timestamp".to_owned(),
523 )
524 .into());
525 }
526 AsOf::ProcessTimeWithInterval((value, leading_field)) => {
527 let interval = Interval::parse_with_fields(
528 value,
529 Some(crate::Binder::bind_date_time_field(leading_field.clone())),
530 )
531 .map_err(|_| anyhow!("fail to parse interval"))?;
532 let interval_sec = (interval.epoch_in_micros() / 1_000_000) as i64;
533 let timestamp = chrono::Utc::now()
534 .timestamp()
535 .checked_sub(interval_sec)
536 .ok_or_else(|| anyhow!("invalid timestamp"))?;
537 AsOfType::Timestamp(as_of::Timestamp { timestamp })
538 }
539 };
540 Ok(Some(PbAsOf {
541 as_of_type: Some(as_of_type),
542 }))
543}
544
545pub fn to_iceberg_time_travel_as_of(
546 a: &Option<AsOf>,
547 timezone: &String,
548) -> Result<Option<IcebergTimeTravelInfo>> {
549 Ok(match a {
550 Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(*v)),
551 Some(AsOf::TimestampNum(ts)) => Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000)),
552 Some(AsOf::VersionString(_)) => {
553 bail!("Unsupported version string in iceberg time travel")
554 }
555 Some(AsOf::TimestampString(ts)) => {
556 let date_time = speedate::DateTime::parse_str_rfc3339(ts)
557 .map_err(|_e| anyhow!("fail to parse timestamp"))?;
558 let timestamp = if date_time.time.tz_offset.is_none() {
559 let tz = Timestamptz::lookup_time_zone(timezone).map_err(|e| anyhow!(e))?;
561 match tz.with_ymd_and_hms(
562 date_time.date.year.into(),
563 date_time.date.month.into(),
564 date_time.date.day.into(),
565 date_time.time.hour.into(),
566 date_time.time.minute.into(),
567 date_time.time.second.into(),
568 ) {
569 MappedLocalTime::Single(d) => Ok(d.timestamp()),
570 MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
571 Err(anyhow!(format!(
572 "failed to parse the timestamp {ts} with the specified time zone {tz}"
573 )))
574 }
575 }?
576 } else {
577 date_time.timestamp_tz()
578 };
579
580 Some(IcebergTimeTravelInfo::TimestampMs(
581 timestamp * 1000 + date_time.time.microsecond as i64 / 1000,
582 ))
583 }
584 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
585 unreachable!()
586 }
587 None => None,
588 })
589}
590
591pub fn scan_ranges_as_strs(order_names: Vec<String>, scan_ranges: &Vec<ScanRange>) -> Vec<String> {
592 let mut range_strs = vec![];
593
594 let explain_max_range = 20;
595 for scan_range in scan_ranges.iter().take(explain_max_range) {
596 #[expect(clippy::disallowed_methods)]
597 let mut range_str = scan_range
598 .eq_conds
599 .iter()
600 .zip(order_names.iter())
601 .map(|(v, name)| match v {
602 Some(v) => format!("{} = {:?}", name, v),
603 None => format!("{} IS NULL", name),
604 })
605 .collect_vec();
606
607 let len = scan_range.eq_conds.len();
608 if !is_full_range(&scan_range.range) {
609 let bound_range_str = match (&scan_range.range.0, &scan_range.range.1) {
610 (Bound::Unbounded, Bound::Unbounded) => unreachable!(),
611 (Bound::Unbounded, ub) => ub_to_string(&order_names[len..], ub),
612 (lb, Bound::Unbounded) => lb_to_string(&order_names[len..], lb),
613 (lb, ub) => format!(
614 "{} AND {}",
615 lb_to_string(&order_names[len..], lb),
616 ub_to_string(&order_names[len..], ub)
617 ),
618 };
619 range_str.push(bound_range_str);
620 }
621 range_strs.push(range_str.join(" AND "));
622 }
623 if scan_ranges.len() > explain_max_range {
624 range_strs.push("...".to_owned());
625 }
626 range_strs
627}
628
629pub fn ub_to_string(order_names: &[String], ub: &Bound<Vec<Option<ScalarImpl>>>) -> String {
630 match ub {
631 Bound::Included(v) => {
632 let (name, value) = row_to_string(order_names, v);
633 format!("{} <= {}", name, value)
634 }
635 Bound::Excluded(v) => {
636 let (name, value) = row_to_string(order_names, v);
637 format!("{} < {}", name, value)
638 }
639 Bound::Unbounded => unreachable!(),
640 }
641}
642
643pub fn lb_to_string(order_names: &[String], lb: &Bound<Vec<Option<ScalarImpl>>>) -> String {
644 match lb {
645 Bound::Included(v) => {
646 let (name, value) = row_to_string(order_names, v);
647 format!("{} >= {}", name, value)
648 }
649 Bound::Excluded(v) => {
650 let (name, value) = row_to_string(order_names, v);
651 format!("{} > {}", name, value)
652 }
653 Bound::Unbounded => unreachable!(),
654 }
655}
656
657pub fn row_to_string(
658 order_names: &[String],
659 struct_values: &Vec<Option<ScalarImpl>>,
660) -> (String, String) {
661 let mut names = vec![];
662 let mut values = vec![];
663 #[expect(clippy::disallowed_methods)]
664 for (name, value) in order_names.iter().zip(struct_values.iter()) {
665 names.push(name);
666 match value {
667 Some(v) => values.push(format!("{:?}", v)),
668 None => values.push("null".to_owned()),
669 }
670 }
671 if names.len() == 1 {
672 (names[0].clone(), values[0].clone())
673 } else {
674 (
675 format!("({})", names.iter().join(", ")),
676 format!("({})", values.iter().join(", ")),
677 )
678 }
679}