1use std::collections::HashMap;
16use std::default::Default;
17use std::ops::Bound;
18use std::vec;
19
20use anyhow::anyhow;
21use chrono::{MappedLocalTime, TimeZone};
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
25use risingwave_common::catalog::{
26 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, FieldDisplay,
27 OBJECT_ID_PLACEHOLDER, Schema, StreamJobStatus,
28};
29use risingwave_common::constants::log_store::v2::{
30 KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
31};
32use risingwave_common::hash::VnodeCount;
33use risingwave_common::license::Feature;
34use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamptz};
35use risingwave_common::util::iter_util::ZipEqFast;
36use risingwave_common::util::scan_range::{ScanRange, is_full_range};
37use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
38use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
39use risingwave_expr::aggregate::PbAggKind;
40use risingwave_expr::bail;
41use risingwave_pb::plan_common::as_of::AsOfType;
42use risingwave_pb::plan_common::{PbAsOf, as_of};
43use risingwave_sqlparser::ast::AsOf;
44
45use super::generic::{self, GenericPlanRef, PhysicalPlanRef};
46use super::pretty_config;
47use crate::PlanRef;
48use crate::catalog::table_catalog::TableType;
49use crate::catalog::{ColumnId, TableCatalog, TableId};
50use crate::error::{ErrorCode, Result};
51use crate::expr::InputRef;
52use crate::optimizer::StreamScanType;
53use crate::optimizer::plan_node::generic::Agg;
54use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
55use crate::optimizer::property::{Cardinality, Order, RequiredDist, WatermarkColumns};
56use crate::utils::{Condition, IndexSet};
57
58#[derive(Default)]
59pub struct TableCatalogBuilder {
60 columns: Vec<ColumnCatalog>,
62 pk: Vec<ColumnOrder>,
63 value_indices: Option<Vec<usize>>,
64 vnode_col_idx: Option<usize>,
65 column_names: HashMap<String, i32>,
66 watermark_columns: Option<FixedBitSet>,
67 dist_key_in_pk: Option<Vec<usize>>,
68}
69
70impl TableCatalogBuilder {
73 pub fn add_column(&mut self, field: &Field) -> usize {
75 let column_idx = self.columns.len();
76 let column_id = column_idx as i32;
77 let mut column_desc = ColumnDesc::from_field_with_column_id(field, column_id);
79
80 column_desc.name = column_desc.name.replace('.', "_");
82 self.avoid_duplicate_col_name(&mut column_desc);
84
85 self.columns.push(ColumnCatalog {
86 column_desc,
87 is_hidden: false,
89 });
90 column_idx
91 }
92
93 pub fn extend_columns(&mut self, columns: &[ColumnCatalog]) -> Vec<usize> {
97 let base_idx = self.columns.len();
98 columns.iter().enumerate().for_each(|(i, col)| {
99 assert!(!self.column_names.contains_key(col.name()));
100 self.column_names.insert(col.name().to_owned(), 0);
101
102 let mut new_col = col.clone();
104 new_col.column_desc.column_id = ColumnId::new((base_idx + i) as _);
105 self.columns.push(new_col);
106 });
107 Vec::from_iter(base_idx..(base_idx + columns.len()))
108 }
109
110 pub fn add_order_column(&mut self, column_index: usize, order_type: OrderType) {
113 self.pk.push(ColumnOrder::new(column_index, order_type));
114 }
115
116 pub fn get_current_pk_len(&self) -> usize {
118 self.pk.len()
119 }
120
121 pub fn set_vnode_col_idx(&mut self, vnode_col_idx: usize) {
122 self.vnode_col_idx = Some(vnode_col_idx);
123 }
124
125 pub fn set_value_indices(&mut self, value_indices: Vec<usize>) {
126 self.value_indices = Some(value_indices);
127 }
128
129 pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec<usize>) {
130 self.dist_key_in_pk = Some(dist_key_in_pk);
131 }
132
133 fn avoid_duplicate_col_name(&mut self, column_desc: &mut ColumnDesc) {
136 if let Some(old_identity) = self.column_names.get(&column_desc.name) {
137 let column_name = column_desc.name.clone();
138 let mut identity = *old_identity;
139 loop {
140 column_desc.name = format!("{}_{}", column_name, identity);
141 identity += 1;
142 if !self.column_names.contains_key(&column_desc.name) {
143 break;
144 }
145 }
146 *self.column_names.get_mut(&column_name).unwrap() = identity;
147 }
148 self.column_names.insert(column_desc.name.clone(), 0);
149 }
150
151 pub fn build(self, distribution_key: Vec<usize>, read_prefix_len_hint: usize) -> TableCatalog {
155 assert!(read_prefix_len_hint <= self.pk.len());
156 let watermark_columns = match self.watermark_columns {
157 Some(w) => w,
158 None => FixedBitSet::with_capacity(self.columns.len()),
159 };
160
161 if let Some(dist_key_in_pk) = &self.dist_key_in_pk {
165 let derived_dist_key = dist_key_in_pk
166 .iter()
167 .map(|idx| self.pk[*idx].column_index)
168 .collect_vec();
169 assert_eq!(
170 derived_dist_key, distribution_key,
171 "dist_key mismatch with dist_key_in_pk"
172 );
173 }
174
175 TableCatalog {
176 id: TableId::placeholder(),
177 schema_id: 0,
178 database_id: 0,
179 associated_source_id: None,
180 name: String::new(),
181 dependent_relations: vec![],
182 columns: self.columns.clone(),
183 pk: self.pk,
184 stream_key: vec![],
185 distribution_key,
186 table_type: TableType::Internal,
189 append_only: false,
190 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
191 fragment_id: OBJECT_ID_PLACEHOLDER,
192 dml_fragment_id: None,
193 vnode_col_index: self.vnode_col_idx,
194 row_id_index: None,
195 value_indices: self
196 .value_indices
197 .unwrap_or_else(|| (0..self.columns.len()).collect_vec()),
198 definition: "".into(),
199 conflict_behavior: ConflictBehavior::NoCheck,
200 version_column_index: None,
201 read_prefix_len_hint,
202 version: None, watermark_columns,
204 dist_key_in_pk: self.dist_key_in_pk.unwrap_or_default(),
205 cardinality: Cardinality::unknown(), created_at_epoch: None,
207 initialized_at_epoch: None,
208 cleaned_by_watermark: false,
209 create_type: CreateType::Foreground,
212 stream_job_status: StreamJobStatus::Creating,
213 description: None,
214 incoming_sinks: vec![],
215 initialized_at_cluster_version: None,
216 created_at_cluster_version: None,
217 retention_seconds: None,
218 cdc_table_id: None,
219 vnode_count: VnodeCount::Placeholder, webhook_info: None,
221 job_id: None,
222 engine: Engine::Hummock,
223 clean_watermark_index_in_pk: None, }
225 }
226
227 pub fn columns(&self) -> &[ColumnCatalog] {
228 &self.columns
229 }
230}
231
232pub trait Distill {
234 fn distill<'a>(&self) -> XmlNode<'a>;
235
236 fn distill_to_string(&self) -> String {
237 let mut config = pretty_config();
238 let mut output = String::with_capacity(2048);
239 config.unicode(&mut output, &Pretty::Record(self.distill()));
240 output
241 }
242}
243
244pub(super) fn childless_record<'a>(
245 name: impl Into<Str<'a>>,
246 fields: StrAssocArr<'a>,
247) -> XmlNode<'a> {
248 XmlNode::simple_record(name, fields, Default::default())
249}
250
251macro_rules! impl_distill_by_unit {
252 ($ty:ty, $core:ident, $name:expr) => {
253 use pretty_xmlish::XmlNode;
254 use $crate::optimizer::plan_node::generic::DistillUnit;
255 use $crate::optimizer::plan_node::utils::Distill;
256 impl Distill for $ty {
257 fn distill<'a>(&self) -> XmlNode<'a> {
258 self.$core.distill_with_name($name)
259 }
260 }
261 };
262}
263pub(crate) use impl_distill_by_unit;
264
265pub(crate) fn column_names_pretty<'a>(schema: &Schema) -> Pretty<'a> {
266 let columns = (schema.fields.iter())
267 .map(|f| f.name.clone())
268 .map(Pretty::from)
269 .collect();
270 Pretty::Array(columns)
271}
272
273pub(crate) fn watermark_pretty<'a>(
274 watermark_columns: &WatermarkColumns,
275 schema: &Schema,
276) -> Option<Pretty<'a>> {
277 if watermark_columns.is_empty() {
278 None
279 } else {
280 let groups = watermark_columns.grouped();
281 let pretty_groups = groups
282 .values()
283 .map(|cols| {
284 Pretty::Array(
285 cols.indices()
286 .map(|idx| FieldDisplay(schema.fields.get(idx).unwrap()))
287 .map(|d| Pretty::display(&d))
288 .collect::<Vec<_>>(),
289 )
290 })
291 .collect();
292 Some(Pretty::Array(pretty_groups))
293 }
294}
295
296#[derive(Clone, Copy)]
297pub struct IndicesDisplay<'a> {
298 pub indices: &'a [usize],
299 pub schema: &'a Schema,
300}
301
302impl<'a> IndicesDisplay<'a> {
303 pub fn from_join<'b, PlanRef: GenericPlanRef>(
305 join: &'a generic::Join<PlanRef>,
306 input_schema: &'a Schema,
307 ) -> Pretty<'b> {
308 let col_num = join.internal_column_num();
309 let id = Self::from(&join.output_indices, col_num, input_schema);
310 id.map_or_else(|| Pretty::from("all"), Self::distill)
311 }
312
313 fn from(indices: &'a [usize], col_num: usize, schema: &'a Schema) -> Option<Self> {
315 if indices.iter().copied().eq(0..col_num) {
316 return None;
317 }
318 Some(Self { indices, schema })
319 }
320
321 pub fn distill<'b>(self) -> Pretty<'b> {
322 let vec = self.indices.iter().map(|&i| {
323 let name = self.schema.fields.get(i).unwrap().name.clone();
324 Pretty::from(name)
325 });
326 Pretty::Array(vec.collect())
327 }
328}
329
330pub(crate) fn sum_affected_row(dml: PlanRef) -> Result<PlanRef> {
331 let dml = RequiredDist::single().enforce_if_not_satisfies(dml, &Order::any())?;
332 let sum_agg = PlanAggCall {
334 agg_type: PbAggKind::Sum.into(),
335 return_type: DataType::Int64,
336 inputs: vec![InputRef::new(0, DataType::Int64)],
337 distinct: false,
338 order_by: vec![],
339 filter: Condition::true_cond(),
340 direct_args: vec![],
341 };
342 let agg = Agg::new(vec![sum_agg], IndexSet::empty(), dml);
343 let batch_agg = BatchSimpleAgg::new(agg);
344 Ok(batch_agg.into())
345}
346
347macro_rules! plan_node_name {
351 ($name:literal $(, { $prop:literal, $cond:expr } )* $(,)?) => {
352 {
353 #[allow(unused_mut)]
354 let mut properties: Vec<&str> = vec![];
355 $( if $cond { properties.push($prop); } )*
356 let mut name = $name.to_string();
357 if !properties.is_empty() {
358 name += " [";
359 name += &properties.join(", ");
360 name += "]";
361 }
362 name
363 }
364 };
365}
366pub(crate) use plan_node_name;
367
368pub fn infer_kv_log_store_table_catalog_inner(
369 input: &PlanRef,
370 columns: &[ColumnCatalog],
371) -> TableCatalog {
372 let mut table_catalog_builder = TableCatalogBuilder::default();
373
374 let mut value_indices =
375 Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + input.schema().fields().len());
376
377 for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
378 let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
379 value_indices.push(indice);
380 }
381
382 table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
383
384 for (i, ordering) in PK_ORDERING.iter().enumerate() {
385 table_catalog_builder.add_order_column(i, *ordering);
386 }
387
388 let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
389
390 if columns.len() != input.schema().fields().len()
391 || columns
392 .iter()
393 .zip_eq_fast(input.schema().fields())
394 .any(|(c, f)| *c.data_type() != f.data_type())
395 {
396 tracing::warn!(
397 "sink schema different with upstream schema: sink columns: {:?}, input schema: {:?}.",
398 columns,
399 input.schema()
400 );
401 }
402 for field in input.schema().fields() {
403 let indice = table_catalog_builder.add_column(field);
404 value_indices.push(indice);
405 }
406 table_catalog_builder.set_value_indices(value_indices);
407
408 let dist_key = input
410 .distribution()
411 .dist_column_indices()
412 .iter()
413 .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
414 .collect_vec();
415
416 table_catalog_builder.build(dist_key, read_prefix_len_hint)
417}
418
419pub fn infer_synced_kv_log_store_table_catalog_inner(
420 input: &PlanRef,
421 columns: &[Field],
422) -> TableCatalog {
423 let mut table_catalog_builder = TableCatalogBuilder::default();
424
425 let mut value_indices =
426 Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());
427
428 for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
429 let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
430 value_indices.push(indice);
431 }
432
433 table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
434
435 for (i, ordering) in PK_ORDERING.iter().enumerate() {
436 table_catalog_builder.add_order_column(i, *ordering);
437 }
438
439 let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
440
441 let payload_indices = {
442 let mut payload_indices = Vec::with_capacity(columns.len());
443 for column in columns {
444 let payload_index = table_catalog_builder.add_column(column);
445 payload_indices.push(payload_index);
446 }
447 payload_indices
448 };
449
450 value_indices.extend(payload_indices);
451 table_catalog_builder.set_value_indices(value_indices);
452
453 let dist_key = input
455 .distribution()
456 .dist_column_indices()
457 .iter()
458 .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
459 .collect_vec();
460
461 table_catalog_builder.build(dist_key, read_prefix_len_hint)
462}
463
464pub(crate) fn plan_can_use_background_ddl(plan: &PlanRef) -> bool {
469 if plan.inputs().is_empty() {
470 if plan.as_stream_source_scan().is_some()
471 || plan.as_stream_now().is_some()
472 || plan.as_stream_source().is_some()
473 {
474 true
475 } else if let Some(scan) = plan.as_stream_table_scan() {
476 scan.stream_scan_type() == StreamScanType::Backfill
477 || scan.stream_scan_type() == StreamScanType::ArrangementBackfill
478 || scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill
479 || scan.stream_scan_type() == StreamScanType::SnapshotBackfill
480 } else {
481 false
482 }
483 } else {
484 assert!(!plan.inputs().is_empty());
485 plan.inputs().iter().all(plan_can_use_background_ddl)
486 }
487}
488
489pub fn to_pb_time_travel_as_of(a: &Option<AsOf>) -> Result<Option<PbAsOf>> {
490 let Some(a) = a else {
491 return Ok(None);
492 };
493 Feature::TimeTravel
494 .check_available()
495 .map_err(|e| anyhow::anyhow!(e))?;
496 let as_of_type = match a {
497 AsOf::ProcessTime => {
498 return Err(ErrorCode::NotSupported(
499 "do not support as of proctime".to_owned(),
500 "please use as of timestamp".to_owned(),
501 )
502 .into());
503 }
504 AsOf::TimestampNum(ts) => AsOfType::Timestamp(as_of::Timestamp { timestamp: *ts }),
505 AsOf::TimestampString(ts) => {
506 let date_time = speedate::DateTime::parse_str_rfc3339(ts)
507 .map_err(|_e| anyhow!("fail to parse timestamp"))?;
508 let timestamp = if date_time.time.tz_offset.is_none() {
509 risingwave_expr::expr_context::TIME_ZONE::try_with(|set_time_zone| {
511 let tz =
512 Timestamptz::lookup_time_zone(set_time_zone).map_err(|e| anyhow!(e))?;
513 match tz.with_ymd_and_hms(
514 date_time.date.year.into(),
515 date_time.date.month.into(),
516 date_time.date.day.into(),
517 date_time.time.hour.into(),
518 date_time.time.minute.into(),
519 date_time.time.second.into(),
520 ) {
521 MappedLocalTime::Single(d) => Ok(d.timestamp()),
522 MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
523 Err(anyhow!(format!(
524 "failed to parse the timestamp {ts} with the specified time zone {tz}"
525 )))
526 }
527 }
528 })??
529 } else {
530 date_time.timestamp_tz()
531 };
532 AsOfType::Timestamp(as_of::Timestamp { timestamp })
533 }
534 AsOf::VersionNum(_) | AsOf::VersionString(_) => {
535 return Err(ErrorCode::NotSupported(
536 "do not support as of version".to_owned(),
537 "please use as of timestamp".to_owned(),
538 )
539 .into());
540 }
541 AsOf::ProcessTimeWithInterval((value, leading_field)) => {
542 let interval = Interval::parse_with_fields(
543 value,
544 Some(crate::Binder::bind_date_time_field(leading_field.clone())),
545 )
546 .map_err(|_| anyhow!("fail to parse interval"))?;
547 let interval_sec = (interval.epoch_in_micros() / 1_000_000) as i64;
548 let timestamp = chrono::Utc::now()
549 .timestamp()
550 .checked_sub(interval_sec)
551 .ok_or_else(|| anyhow!("invalid timestamp"))?;
552 AsOfType::Timestamp(as_of::Timestamp { timestamp })
553 }
554 };
555 Ok(Some(PbAsOf {
556 as_of_type: Some(as_of_type),
557 }))
558}
559
560pub fn to_iceberg_time_travel_as_of(
561 a: &Option<AsOf>,
562 timezone: &String,
563) -> Result<Option<IcebergTimeTravelInfo>> {
564 Ok(match a {
565 Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(*v)),
566 Some(AsOf::TimestampNum(ts)) => Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000)),
567 Some(AsOf::VersionString(_)) => {
568 bail!("Unsupported version string in iceberg time travel")
569 }
570 Some(AsOf::TimestampString(ts)) => {
571 let date_time = speedate::DateTime::parse_str_rfc3339(ts)
572 .map_err(|_e| anyhow!("fail to parse timestamp"))?;
573 let timestamp = if date_time.time.tz_offset.is_none() {
574 let tz = Timestamptz::lookup_time_zone(timezone).map_err(|e| anyhow!(e))?;
576 match tz.with_ymd_and_hms(
577 date_time.date.year.into(),
578 date_time.date.month.into(),
579 date_time.date.day.into(),
580 date_time.time.hour.into(),
581 date_time.time.minute.into(),
582 date_time.time.second.into(),
583 ) {
584 MappedLocalTime::Single(d) => Ok(d.timestamp()),
585 MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
586 Err(anyhow!(format!(
587 "failed to parse the timestamp {ts} with the specified time zone {tz}"
588 )))
589 }
590 }?
591 } else {
592 date_time.timestamp_tz()
593 };
594
595 Some(IcebergTimeTravelInfo::TimestampMs(
596 timestamp * 1000 + date_time.time.microsecond as i64 / 1000,
597 ))
598 }
599 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
600 unreachable!()
601 }
602 None => None,
603 })
604}
605
606pub fn scan_ranges_as_strs(order_names: Vec<String>, scan_ranges: &Vec<ScanRange>) -> Vec<String> {
607 let mut range_strs = vec![];
608
609 let explain_max_range = 20;
610 for scan_range in scan_ranges.iter().take(explain_max_range) {
611 #[expect(clippy::disallowed_methods)]
612 let mut range_str = scan_range
613 .eq_conds
614 .iter()
615 .zip(order_names.iter())
616 .map(|(v, name)| match v {
617 Some(v) => format!("{} = {:?}", name, v),
618 None => format!("{} IS NULL", name),
619 })
620 .collect_vec();
621
622 let len = scan_range.eq_conds.len();
623 if !is_full_range(&scan_range.range) {
624 let bound_range_str = match (&scan_range.range.0, &scan_range.range.1) {
625 (Bound::Unbounded, Bound::Unbounded) => unreachable!(),
626 (Bound::Unbounded, ub) => ub_to_string(&order_names[len..], ub),
627 (lb, Bound::Unbounded) => lb_to_string(&order_names[len..], lb),
628 (lb, ub) => format!(
629 "{} AND {}",
630 lb_to_string(&order_names[len..], lb),
631 ub_to_string(&order_names[len..], ub)
632 ),
633 };
634 range_str.push(bound_range_str);
635 }
636 range_strs.push(range_str.join(" AND "));
637 }
638 if scan_ranges.len() > explain_max_range {
639 range_strs.push("...".to_owned());
640 }
641 range_strs
642}
643
644pub fn ub_to_string(order_names: &[String], ub: &Bound<Vec<Option<ScalarImpl>>>) -> String {
645 match ub {
646 Bound::Included(v) => {
647 let (name, value) = row_to_string(order_names, v);
648 format!("{} <= {}", name, value)
649 }
650 Bound::Excluded(v) => {
651 let (name, value) = row_to_string(order_names, v);
652 format!("{} < {}", name, value)
653 }
654 Bound::Unbounded => unreachable!(),
655 }
656}
657
658pub fn lb_to_string(order_names: &[String], lb: &Bound<Vec<Option<ScalarImpl>>>) -> String {
659 match lb {
660 Bound::Included(v) => {
661 let (name, value) = row_to_string(order_names, v);
662 format!("{} >= {}", name, value)
663 }
664 Bound::Excluded(v) => {
665 let (name, value) = row_to_string(order_names, v);
666 format!("{} > {}", name, value)
667 }
668 Bound::Unbounded => unreachable!(),
669 }
670}
671
672pub fn row_to_string(
673 order_names: &[String],
674 struct_values: &Vec<Option<ScalarImpl>>,
675) -> (String, String) {
676 let mut names = vec![];
677 let mut values = vec![];
678 #[expect(clippy::disallowed_methods)]
679 for (name, value) in order_names.iter().zip(struct_values.iter()) {
680 names.push(name);
681 match value {
682 Some(v) => values.push(format!("{:?}", v)),
683 None => values.push("null".to_owned()),
684 }
685 }
686 if names.len() == 1 {
687 (names[0].clone(), values[0].clone())
688 } else {
689 (
690 format!("({})", names.iter().join(", ")),
691 format!("({})", values.iter().join(", ")),
692 )
693 }
694}