1use std::collections::HashMap;
16use std::default::Default;
17use std::ops::Bound;
18use std::vec;
19
20use anyhow::anyhow;
21use chrono::{MappedLocalTime, TimeZone};
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
25use risingwave_common::catalog::{
26 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, FieldDisplay, Schema,
27 StreamJobStatus,
28};
29use risingwave_common::constants::log_store::v2::{
30 KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
31};
32use risingwave_common::hash::VnodeCount;
33use risingwave_common::license::Feature;
34use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamptz};
35use risingwave_common::util::iter_util::ZipEqFast;
36use risingwave_common::util::scan_range::{ScanRange, is_full_range};
37use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
38use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
39use risingwave_expr::aggregate::PbAggKind;
40use risingwave_expr::bail;
41use risingwave_sqlparser::ast::AsOf;
42
43use super::generic::{self, GenericPlanRef, PhysicalPlanRef};
44use super::{BatchPlanRef, StreamPlanRef, pretty_config};
45use crate::catalog::table_catalog::TableType;
46use crate::catalog::{ColumnId, FragmentId, TableCatalog, TableId};
47use crate::error::{ErrorCode, Result};
48use crate::expr::InputRef;
49use crate::optimizer::StreamScanType;
50use crate::optimizer::plan_node::generic::Agg;
51use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
52use crate::optimizer::property::{Cardinality, Order, RequiredDist, WatermarkColumns};
53use crate::utils::{Condition, IndexSet};
54
55#[derive(Default)]
56pub struct TableCatalogBuilder {
57 columns: Vec<ColumnCatalog>,
59 pk: Vec<ColumnOrder>,
60 value_indices: Option<Vec<usize>>,
61 vnode_col_idx: Option<usize>,
62 column_names: HashMap<String, i32>,
63 watermark_columns: Option<FixedBitSet>,
64 dist_key_in_pk: Option<Vec<usize>>,
65 clean_watermark_indices: Vec<usize>,
67}
68
69impl TableCatalogBuilder {
72 pub fn add_column(&mut self, field: &Field) -> usize {
74 let column_idx = self.columns.len();
75 let column_id = column_idx as i32;
76 let mut column_desc = ColumnDesc::from_field_with_column_id(field, column_id);
78
79 column_desc.name = column_desc.name.replace('.', "_");
81 self.avoid_duplicate_col_name(&mut column_desc);
83
84 self.columns.push(ColumnCatalog {
85 column_desc,
86 is_hidden: false,
88 });
89 column_idx
90 }
91
92 pub fn extend_columns(&mut self, columns: &[ColumnCatalog]) -> Vec<usize> {
96 let base_idx = self.columns.len();
97 columns.iter().enumerate().for_each(|(i, col)| {
98 assert!(!self.column_names.contains_key(col.name()));
99 self.column_names.insert(col.name().to_owned(), 0);
100
101 let mut new_col = col.clone();
103 new_col.column_desc.column_id = ColumnId::new((base_idx + i) as _);
104 self.columns.push(new_col);
105 });
106 Vec::from_iter(base_idx..(base_idx + columns.len()))
107 }
108
109 pub fn add_order_column(&mut self, column_index: usize, order_type: OrderType) {
112 self.pk.push(ColumnOrder::new(column_index, order_type));
113 }
114
115 pub fn get_current_pk_len(&self) -> usize {
117 self.pk.len()
118 }
119
120 pub fn set_vnode_col_idx(&mut self, vnode_col_idx: usize) {
121 self.vnode_col_idx = Some(vnode_col_idx);
122 }
123
124 pub fn set_value_indices(&mut self, value_indices: Vec<usize>) {
125 self.value_indices = Some(value_indices);
126 }
127
128 pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec<usize>) {
129 self.dist_key_in_pk = Some(dist_key_in_pk);
130 }
131
132 pub fn set_clean_watermark_indices(&mut self, clean_watermark_indices: Vec<usize>) {
135 self.clean_watermark_indices = clean_watermark_indices;
136 }
137
138 fn avoid_duplicate_col_name(&mut self, column_desc: &mut ColumnDesc) {
141 if let Some(old_identity) = self.column_names.get(&column_desc.name) {
142 let column_name = column_desc.name.clone();
143 let mut identity = *old_identity;
144 loop {
145 column_desc.name = format!("{}_{}", column_name, identity);
146 identity += 1;
147 if !self.column_names.contains_key(&column_desc.name) {
148 break;
149 }
150 }
151 *self.column_names.get_mut(&column_name).unwrap() = identity;
152 }
153 self.column_names.insert(column_desc.name.clone(), 0);
154 }
155
156 pub fn build(self, distribution_key: Vec<usize>, read_prefix_len_hint: usize) -> TableCatalog {
160 assert!(read_prefix_len_hint <= self.pk.len());
161 let watermark_columns = match self.watermark_columns {
162 Some(w) => w,
163 None => FixedBitSet::with_capacity(self.columns.len()),
164 };
165
166 if let Some(dist_key_in_pk) = &self.dist_key_in_pk {
170 let derived_dist_key = dist_key_in_pk
171 .iter()
172 .map(|idx| self.pk[*idx].column_index)
173 .collect_vec();
174 assert_eq!(
175 derived_dist_key, distribution_key,
176 "dist_key mismatch with dist_key_in_pk"
177 );
178 }
179
180 TableCatalog {
181 id: TableId::placeholder(),
182 schema_id: 0.into(),
183 database_id: 0.into(),
184 associated_source_id: None,
185 name: String::new(),
186 columns: self.columns.clone(),
187 pk: self.pk,
188 stream_key: vec![],
189 distribution_key,
190 table_type: TableType::Internal,
193 append_only: false,
194 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
195 fragment_id: FragmentId::placeholder(),
196 dml_fragment_id: None,
197 vnode_col_index: self.vnode_col_idx,
198 row_id_index: None,
199 value_indices: self
200 .value_indices
201 .unwrap_or_else(|| (0..self.columns.len()).collect_vec()),
202 definition: "".into(),
203 conflict_behavior: ConflictBehavior::NoCheck,
204 version_column_indices: vec![],
205 read_prefix_len_hint,
206 version: None, watermark_columns,
208 dist_key_in_pk: self.dist_key_in_pk.unwrap_or_default(),
209 cardinality: Cardinality::unknown(), created_at_epoch: None,
211 initialized_at_epoch: None,
212 create_type: CreateType::Foreground,
215 stream_job_status: StreamJobStatus::Creating,
216 description: None,
217 initialized_at_cluster_version: None,
218 created_at_cluster_version: None,
219 retention_seconds: None,
220 cdc_table_id: None,
221 vnode_count: VnodeCount::Placeholder, webhook_info: None,
223 job_id: None,
224 engine: Engine::Hummock,
225 clean_watermark_index_in_pk: None,
226 clean_watermark_indices: self.clean_watermark_indices,
227 refreshable: false, vector_index_info: None,
229 cdc_table_type: None,
230 }
231 }
232
233 pub fn columns(&self) -> &[ColumnCatalog] {
234 &self.columns
235 }
236}
237
238pub trait Distill {
240 fn distill<'a>(&self) -> XmlNode<'a>;
241
242 fn distill_to_string(&self) -> String {
243 let mut config = pretty_config();
244 let mut output = String::with_capacity(2048);
245 config.unicode(&mut output, &Pretty::Record(self.distill()));
246 output
247 }
248}
249
250pub(super) fn childless_record<'a>(
251 name: impl Into<Str<'a>>,
252 fields: StrAssocArr<'a>,
253) -> XmlNode<'a> {
254 XmlNode::simple_record(name, fields, Default::default())
255}
256
257macro_rules! impl_distill_by_unit {
258 ($ty:ty, $core:ident, $name:expr) => {
259 use pretty_xmlish::XmlNode;
260 use $crate::optimizer::plan_node::generic::DistillUnit;
261 use $crate::optimizer::plan_node::utils::Distill;
262 impl Distill for $ty {
263 fn distill<'a>(&self) -> XmlNode<'a> {
264 self.$core.distill_with_name($name)
265 }
266 }
267 };
268}
269pub(crate) use impl_distill_by_unit;
270
271pub(crate) fn column_names_pretty<'a>(schema: &Schema) -> Pretty<'a> {
272 let columns = (schema.fields.iter())
273 .map(|f| f.name.clone())
274 .map(Pretty::from)
275 .collect();
276 Pretty::Array(columns)
277}
278
279pub(crate) fn watermark_pretty<'a>(
280 watermark_columns: &WatermarkColumns,
281 schema: &Schema,
282) -> Option<Pretty<'a>> {
283 if watermark_columns.is_empty() {
284 None
285 } else {
286 let groups = watermark_columns.grouped();
287 let pretty_groups = groups
288 .values()
289 .map(|cols| {
290 Pretty::Array(
291 cols.indices()
292 .map(|idx| FieldDisplay(schema.fields.get(idx).unwrap()))
293 .map(|d| Pretty::display(&d))
294 .collect::<Vec<_>>(),
295 )
296 })
297 .collect();
298 Some(Pretty::Array(pretty_groups))
299 }
300}
301
302#[derive(Clone, Copy)]
303pub struct IndicesDisplay<'a> {
304 pub indices: &'a [usize],
305 pub schema: &'a Schema,
306}
307
308impl<'a> IndicesDisplay<'a> {
309 pub fn from_join<'b>(
311 join: &'a generic::Join<impl GenericPlanRef>,
312 input_schema: &'a Schema,
313 ) -> Pretty<'b> {
314 let col_num = join.internal_column_num();
315 let id = Self::from(&join.output_indices, col_num, input_schema);
316 id.map_or_else(|| Pretty::from("all"), Self::distill)
317 }
318
319 fn from(indices: &'a [usize], col_num: usize, schema: &'a Schema) -> Option<Self> {
321 if indices.iter().copied().eq(0..col_num) {
322 return None;
323 }
324 Some(Self { indices, schema })
325 }
326
327 pub fn distill<'b>(self) -> Pretty<'b> {
328 let vec = self.indices.iter().map(|&i| {
329 let name = self.schema.fields.get(i).unwrap().name.clone();
330 Pretty::from(name)
331 });
332 Pretty::Array(vec.collect())
333 }
334}
335
336pub(crate) fn sum_affected_row(dml: BatchPlanRef) -> Result<BatchPlanRef> {
337 let dml = RequiredDist::single().batch_enforce_if_not_satisfies(dml, &Order::any())?;
338 let sum_agg = PlanAggCall {
340 agg_type: PbAggKind::Sum.into(),
341 return_type: DataType::Int64,
342 inputs: vec![InputRef::new(0, DataType::Int64)],
343 distinct: false,
344 order_by: vec![],
345 filter: Condition::true_cond(),
346 direct_args: vec![],
347 };
348 let agg = Agg::new(vec![sum_agg], IndexSet::empty(), dml);
349 let batch_agg = BatchSimpleAgg::new(agg);
350 Ok(batch_agg.into())
351}
352
353macro_rules! plan_node_name {
357 ($name:literal $(, { $prop:literal, $cond:expr } )* $(,)?) => {
358 {
359 #[allow(unused_mut)]
360 let mut properties: Vec<&str> = vec![];
361 $( if $cond { properties.push($prop); } )*
362 let mut name = $name.to_string();
363 if !properties.is_empty() {
364 name += " [";
365 name += &properties.join(", ");
366 name += "]";
367 }
368 name
369 }
370 };
371}
372pub(crate) use plan_node_name;
373use risingwave_pb::common::{PbBatchQueryEpoch, batch_query_epoch};
374
375pub fn infer_kv_log_store_table_catalog_inner(
376 input: &StreamPlanRef,
377 columns: &[ColumnCatalog],
378) -> TableCatalog {
379 let mut table_catalog_builder = TableCatalogBuilder::default();
380
381 let mut value_indices =
382 Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + input.schema().fields().len());
383
384 for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
385 let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
386 value_indices.push(indice);
387 }
388
389 table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
390
391 for (i, ordering) in PK_ORDERING.iter().enumerate() {
392 table_catalog_builder.add_order_column(i, *ordering);
393 }
394
395 let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
396
397 if columns.len() != input.schema().fields().len()
398 || columns
399 .iter()
400 .zip_eq_fast(input.schema().fields())
401 .any(|(c, f)| *c.data_type() != f.data_type())
402 {
403 tracing::warn!(
404 "sink schema different with upstream schema: sink columns: {:?}, input schema: {:?}.",
405 columns,
406 input.schema()
407 );
408 }
409 for field in input.schema().fields() {
410 let indice = table_catalog_builder.add_column(field);
411 value_indices.push(indice);
412 }
413 table_catalog_builder.set_value_indices(value_indices);
414
415 let dist_key = input
417 .distribution()
418 .dist_column_indices()
419 .iter()
420 .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
421 .collect_vec();
422
423 table_catalog_builder.build(dist_key, read_prefix_len_hint)
424}
425
426pub fn infer_synced_kv_log_store_table_catalog_inner(
427 input: &StreamPlanRef,
428 columns: &[Field],
429) -> TableCatalog {
430 let mut table_catalog_builder = TableCatalogBuilder::default();
431
432 let mut value_indices =
433 Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());
434
435 for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
436 let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
437 value_indices.push(indice);
438 }
439
440 table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
441
442 for (i, ordering) in PK_ORDERING.iter().enumerate() {
443 table_catalog_builder.add_order_column(i, *ordering);
444 }
445
446 let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
447
448 let payload_indices = {
449 let mut payload_indices = Vec::with_capacity(columns.len());
450 for column in columns {
451 let payload_index = table_catalog_builder.add_column(column);
452 payload_indices.push(payload_index);
453 }
454 payload_indices
455 };
456
457 value_indices.extend(payload_indices);
458 table_catalog_builder.set_value_indices(value_indices);
459
460 let dist_key = input
462 .distribution()
463 .dist_column_indices()
464 .iter()
465 .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
466 .collect_vec();
467
468 table_catalog_builder.build(dist_key, read_prefix_len_hint)
469}
470
471pub(crate) fn plan_can_use_background_ddl(plan: &StreamPlanRef) -> bool {
476 if plan.inputs().is_empty() {
477 if plan.as_stream_source_scan().is_some()
478 || plan.as_stream_now().is_some()
479 || plan.as_stream_source().is_some()
480 {
481 true
482 } else if let Some(scan) = plan.as_stream_table_scan() {
483 scan.stream_scan_type() == StreamScanType::Backfill
484 || scan.stream_scan_type() == StreamScanType::ArrangementBackfill
485 || scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill
486 || scan.stream_scan_type() == StreamScanType::SnapshotBackfill
487 } else {
488 false
489 }
490 } else {
491 assert!(!plan.inputs().is_empty());
492 plan.inputs().iter().all(plan_can_use_background_ddl)
493 }
494}
495
496pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch {
497 let ts = ts.checked_add(1).unwrap();
498 risingwave_common::util::epoch::Epoch::from_unix_millis_or_earliest(
499 u64::try_from(ts).unwrap_or(0).checked_mul(1000).unwrap(),
500 )
501}
502
503pub fn to_batch_query_epoch(a: &Option<AsOf>) -> Result<Option<PbBatchQueryEpoch>> {
504 let Some(a) = a else {
505 return Ok(None);
506 };
507 Feature::TimeTravel.check_available()?;
508 let timestamp = match a {
509 AsOf::ProcessTime => {
510 return Err(ErrorCode::NotSupported(
511 "do not support as of proctime".to_owned(),
512 "please use as of timestamp".to_owned(),
513 )
514 .into());
515 }
516 AsOf::TimestampNum(ts) => *ts,
517 AsOf::TimestampString(ts) => {
518 let date_time = speedate::DateTime::parse_str_rfc3339(ts)
519 .map_err(|_e| anyhow!("fail to parse timestamp"))?;
520 if date_time.time.tz_offset.is_none() {
521 risingwave_expr::expr_context::TIME_ZONE::try_with(|set_time_zone| {
523 let tz =
524 Timestamptz::lookup_time_zone(set_time_zone).map_err(|e| anyhow!(e))?;
525 match tz.with_ymd_and_hms(
526 date_time.date.year.into(),
527 date_time.date.month.into(),
528 date_time.date.day.into(),
529 date_time.time.hour.into(),
530 date_time.time.minute.into(),
531 date_time.time.second.into(),
532 ) {
533 MappedLocalTime::Single(d) => Ok(d.timestamp()),
534 MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
535 Err(anyhow!(format!(
536 "failed to parse the timestamp {ts} with the specified time zone {tz}"
537 )))
538 }
539 }
540 })??
541 } else {
542 date_time.timestamp_tz()
543 }
544 }
545 AsOf::VersionNum(_) | AsOf::VersionString(_) => {
546 return Err(ErrorCode::NotSupported(
547 "do not support as of version".to_owned(),
548 "please use as of timestamp".to_owned(),
549 )
550 .into());
551 }
552 AsOf::ProcessTimeWithInterval((value, leading_field)) => {
553 let interval = Interval::parse_with_fields(
554 value,
555 Some(crate::Binder::bind_date_time_field(*leading_field)),
556 )
557 .map_err(|_| anyhow!("fail to parse interval"))?;
558 let interval_sec = (interval.epoch_in_micros() / 1_000_000) as i64;
559 chrono::Utc::now()
560 .timestamp()
561 .checked_sub(interval_sec)
562 .ok_or_else(|| anyhow!("invalid timestamp"))?
563 }
564 };
565 Ok(Some(PbBatchQueryEpoch {
566 epoch: Some(batch_query_epoch::PbEpoch::TimeTravel(
567 unix_timestamp_sec_to_epoch(timestamp).0,
568 )),
569 }))
570}
571
572pub fn to_iceberg_time_travel_as_of(
573 a: &Option<AsOf>,
574 timezone: &String,
575) -> Result<Option<IcebergTimeTravelInfo>> {
576 Ok(match a {
577 Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(*v)),
578 Some(AsOf::TimestampNum(ts)) => Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000)),
579 Some(AsOf::VersionString(_)) => {
580 bail!("Unsupported version string in iceberg time travel")
581 }
582 Some(AsOf::TimestampString(ts)) => {
583 let date_time = speedate::DateTime::parse_str_rfc3339(ts)
584 .map_err(|_e| anyhow!("fail to parse timestamp"))?;
585 let timestamp = if date_time.time.tz_offset.is_none() {
586 let tz = Timestamptz::lookup_time_zone(timezone).map_err(|e| anyhow!(e))?;
588 match tz.with_ymd_and_hms(
589 date_time.date.year.into(),
590 date_time.date.month.into(),
591 date_time.date.day.into(),
592 date_time.time.hour.into(),
593 date_time.time.minute.into(),
594 date_time.time.second.into(),
595 ) {
596 MappedLocalTime::Single(d) => Ok(d.timestamp()),
597 MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
598 Err(anyhow!(format!(
599 "failed to parse the timestamp {ts} with the specified time zone {tz}"
600 )))
601 }
602 }?
603 } else {
604 date_time.timestamp_tz()
605 };
606
607 Some(IcebergTimeTravelInfo::TimestampMs(
608 timestamp * 1000 + date_time.time.microsecond as i64 / 1000,
609 ))
610 }
611 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
612 unreachable!()
613 }
614 None => None,
615 })
616}
617
618pub fn scan_ranges_as_strs(order_names: Vec<String>, scan_ranges: &Vec<ScanRange>) -> Vec<String> {
619 let mut range_strs = vec![];
620
621 let explain_max_range = 20;
622 for scan_range in scan_ranges.iter().take(explain_max_range) {
623 #[expect(clippy::disallowed_methods)]
624 let mut range_str = scan_range
625 .eq_conds
626 .iter()
627 .zip(order_names.iter())
628 .map(|(v, name)| match v {
629 Some(v) => format!("{} = {:?}", name, v),
630 None => format!("{} IS NULL", name),
631 })
632 .collect_vec();
633
634 let len = scan_range.eq_conds.len();
635 if !is_full_range(&scan_range.range) {
636 let bound_range_str = match (&scan_range.range.0, &scan_range.range.1) {
637 (Bound::Unbounded, Bound::Unbounded) => unreachable!(),
638 (Bound::Unbounded, ub) => ub_to_string(&order_names[len..], ub),
639 (lb, Bound::Unbounded) => lb_to_string(&order_names[len..], lb),
640 (lb, ub) => format!(
641 "{} AND {}",
642 lb_to_string(&order_names[len..], lb),
643 ub_to_string(&order_names[len..], ub)
644 ),
645 };
646 range_str.push(bound_range_str);
647 }
648 range_strs.push(range_str.join(" AND "));
649 }
650 if scan_ranges.len() > explain_max_range {
651 range_strs.push("...".to_owned());
652 }
653 range_strs
654}
655
656pub fn ub_to_string(order_names: &[String], ub: &Bound<Vec<Option<ScalarImpl>>>) -> String {
657 match ub {
658 Bound::Included(v) => {
659 let (name, value) = row_to_string(order_names, v);
660 format!("{} <= {}", name, value)
661 }
662 Bound::Excluded(v) => {
663 let (name, value) = row_to_string(order_names, v);
664 format!("{} < {}", name, value)
665 }
666 Bound::Unbounded => unreachable!(),
667 }
668}
669
670pub fn lb_to_string(order_names: &[String], lb: &Bound<Vec<Option<ScalarImpl>>>) -> String {
671 match lb {
672 Bound::Included(v) => {
673 let (name, value) = row_to_string(order_names, v);
674 format!("{} >= {}", name, value)
675 }
676 Bound::Excluded(v) => {
677 let (name, value) = row_to_string(order_names, v);
678 format!("{} > {}", name, value)
679 }
680 Bound::Unbounded => unreachable!(),
681 }
682}
683
684pub fn row_to_string(
685 order_names: &[String],
686 struct_values: &Vec<Option<ScalarImpl>>,
687) -> (String, String) {
688 let mut names = vec![];
689 let mut values = vec![];
690 #[expect(clippy::disallowed_methods)]
691 for (name, value) in order_names.iter().zip(struct_values.iter()) {
692 names.push(name);
693 match value {
694 Some(v) => values.push(format!("{:?}", v)),
695 None => values.push("null".to_owned()),
696 }
697 }
698 if names.len() == 1 {
699 (names[0].clone(), values[0].clone())
700 } else {
701 (
702 format!("({})", names.iter().join(", ")),
703 format!("({})", values.iter().join(", ")),
704 )
705 }
706}