1use std::collections::HashMap;
16use std::default::Default;
17use std::ops::Bound;
18use std::vec;
19
20use anyhow::anyhow;
21use chrono::{MappedLocalTime, TimeZone};
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
25use risingwave_common::catalog::{
26    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, FieldDisplay,
27    OBJECT_ID_PLACEHOLDER, Schema, StreamJobStatus,
28};
29use risingwave_common::constants::log_store::v2::{
30    KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
31};
32use risingwave_common::hash::VnodeCount;
33use risingwave_common::license::Feature;
34use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamptz};
35use risingwave_common::util::iter_util::ZipEqFast;
36use risingwave_common::util::scan_range::{ScanRange, is_full_range};
37use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
38use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
39use risingwave_expr::aggregate::PbAggKind;
40use risingwave_expr::bail;
41use risingwave_sqlparser::ast::AsOf;
42
43use super::generic::{self, GenericPlanRef, PhysicalPlanRef};
44use super::{BatchPlanRef, StreamPlanRef, pretty_config};
45use crate::catalog::table_catalog::TableType;
46use crate::catalog::{ColumnId, TableCatalog, TableId};
47use crate::error::{ErrorCode, Result};
48use crate::expr::InputRef;
49use crate::optimizer::StreamScanType;
50use crate::optimizer::plan_node::generic::Agg;
51use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
52use crate::optimizer::property::{Cardinality, Order, RequiredDist, WatermarkColumns};
53use crate::utils::{Condition, IndexSet};
54
55#[derive(Default)]
56pub struct TableCatalogBuilder {
57    columns: Vec<ColumnCatalog>,
59    pk: Vec<ColumnOrder>,
60    value_indices: Option<Vec<usize>>,
61    vnode_col_idx: Option<usize>,
62    column_names: HashMap<String, i32>,
63    watermark_columns: Option<FixedBitSet>,
64    dist_key_in_pk: Option<Vec<usize>>,
65}
66
67impl TableCatalogBuilder {
70    pub fn add_column(&mut self, field: &Field) -> usize {
72        let column_idx = self.columns.len();
73        let column_id = column_idx as i32;
74        let mut column_desc = ColumnDesc::from_field_with_column_id(field, column_id);
76
77        column_desc.name = column_desc.name.replace('.', "_");
79        self.avoid_duplicate_col_name(&mut column_desc);
81
82        self.columns.push(ColumnCatalog {
83            column_desc,
84            is_hidden: false,
86        });
87        column_idx
88    }
89
90    pub fn extend_columns(&mut self, columns: &[ColumnCatalog]) -> Vec<usize> {
94        let base_idx = self.columns.len();
95        columns.iter().enumerate().for_each(|(i, col)| {
96            assert!(!self.column_names.contains_key(col.name()));
97            self.column_names.insert(col.name().to_owned(), 0);
98
99            let mut new_col = col.clone();
101            new_col.column_desc.column_id = ColumnId::new((base_idx + i) as _);
102            self.columns.push(new_col);
103        });
104        Vec::from_iter(base_idx..(base_idx + columns.len()))
105    }
106
107    pub fn add_order_column(&mut self, column_index: usize, order_type: OrderType) {
110        self.pk.push(ColumnOrder::new(column_index, order_type));
111    }
112
113    pub fn get_current_pk_len(&self) -> usize {
115        self.pk.len()
116    }
117
118    pub fn set_vnode_col_idx(&mut self, vnode_col_idx: usize) {
119        self.vnode_col_idx = Some(vnode_col_idx);
120    }
121
122    pub fn set_value_indices(&mut self, value_indices: Vec<usize>) {
123        self.value_indices = Some(value_indices);
124    }
125
126    pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec<usize>) {
127        self.dist_key_in_pk = Some(dist_key_in_pk);
128    }
129
130    fn avoid_duplicate_col_name(&mut self, column_desc: &mut ColumnDesc) {
133        if let Some(old_identity) = self.column_names.get(&column_desc.name) {
134            let column_name = column_desc.name.clone();
135            let mut identity = *old_identity;
136            loop {
137                column_desc.name = format!("{}_{}", column_name, identity);
138                identity += 1;
139                if !self.column_names.contains_key(&column_desc.name) {
140                    break;
141                }
142            }
143            *self.column_names.get_mut(&column_name).unwrap() = identity;
144        }
145        self.column_names.insert(column_desc.name.clone(), 0);
146    }
147
148    pub fn build(self, distribution_key: Vec<usize>, read_prefix_len_hint: usize) -> TableCatalog {
152        assert!(read_prefix_len_hint <= self.pk.len());
153        let watermark_columns = match self.watermark_columns {
154            Some(w) => w,
155            None => FixedBitSet::with_capacity(self.columns.len()),
156        };
157
158        if let Some(dist_key_in_pk) = &self.dist_key_in_pk {
162            let derived_dist_key = dist_key_in_pk
163                .iter()
164                .map(|idx| self.pk[*idx].column_index)
165                .collect_vec();
166            assert_eq!(
167                derived_dist_key, distribution_key,
168                "dist_key mismatch with dist_key_in_pk"
169            );
170        }
171
172        TableCatalog {
173            id: TableId::placeholder(),
174            schema_id: 0.into(),
175            database_id: 0.into(),
176            associated_source_id: None,
177            name: String::new(),
178            columns: self.columns.clone(),
179            pk: self.pk,
180            stream_key: vec![],
181            distribution_key,
182            table_type: TableType::Internal,
185            append_only: false,
186            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
187            fragment_id: OBJECT_ID_PLACEHOLDER,
188            dml_fragment_id: None,
189            vnode_col_index: self.vnode_col_idx,
190            row_id_index: None,
191            value_indices: self
192                .value_indices
193                .unwrap_or_else(|| (0..self.columns.len()).collect_vec()),
194            definition: "".into(),
195            conflict_behavior: ConflictBehavior::NoCheck,
196            version_column_indices: vec![],
197            read_prefix_len_hint,
198            version: None, watermark_columns,
200            dist_key_in_pk: self.dist_key_in_pk.unwrap_or_default(),
201            cardinality: Cardinality::unknown(), created_at_epoch: None,
203            initialized_at_epoch: None,
204            cleaned_by_watermark: false,
205            create_type: CreateType::Foreground,
208            stream_job_status: StreamJobStatus::Creating,
209            description: None,
210            initialized_at_cluster_version: None,
211            created_at_cluster_version: None,
212            retention_seconds: None,
213            cdc_table_id: None,
214            vnode_count: VnodeCount::Placeholder, webhook_info: None,
216            job_id: None,
217            engine: Engine::Hummock,
218            clean_watermark_index_in_pk: None, refreshable: false,                vector_index_info: None,
221            cdc_table_type: None,
222        }
223    }
224
225    pub fn columns(&self) -> &[ColumnCatalog] {
226        &self.columns
227    }
228}
229
230pub trait Distill {
232    fn distill<'a>(&self) -> XmlNode<'a>;
233
234    fn distill_to_string(&self) -> String {
235        let mut config = pretty_config();
236        let mut output = String::with_capacity(2048);
237        config.unicode(&mut output, &Pretty::Record(self.distill()));
238        output
239    }
240}
241
242pub(super) fn childless_record<'a>(
243    name: impl Into<Str<'a>>,
244    fields: StrAssocArr<'a>,
245) -> XmlNode<'a> {
246    XmlNode::simple_record(name, fields, Default::default())
247}
248
249macro_rules! impl_distill_by_unit {
250    ($ty:ty, $core:ident, $name:expr) => {
251        use pretty_xmlish::XmlNode;
252        use $crate::optimizer::plan_node::generic::DistillUnit;
253        use $crate::optimizer::plan_node::utils::Distill;
254        impl Distill for $ty {
255            fn distill<'a>(&self) -> XmlNode<'a> {
256                self.$core.distill_with_name($name)
257            }
258        }
259    };
260}
261pub(crate) use impl_distill_by_unit;
262
263pub(crate) fn column_names_pretty<'a>(schema: &Schema) -> Pretty<'a> {
264    let columns = (schema.fields.iter())
265        .map(|f| f.name.clone())
266        .map(Pretty::from)
267        .collect();
268    Pretty::Array(columns)
269}
270
271pub(crate) fn watermark_pretty<'a>(
272    watermark_columns: &WatermarkColumns,
273    schema: &Schema,
274) -> Option<Pretty<'a>> {
275    if watermark_columns.is_empty() {
276        None
277    } else {
278        let groups = watermark_columns.grouped();
279        let pretty_groups = groups
280            .values()
281            .map(|cols| {
282                Pretty::Array(
283                    cols.indices()
284                        .map(|idx| FieldDisplay(schema.fields.get(idx).unwrap()))
285                        .map(|d| Pretty::display(&d))
286                        .collect::<Vec<_>>(),
287                )
288            })
289            .collect();
290        Some(Pretty::Array(pretty_groups))
291    }
292}
293
294#[derive(Clone, Copy)]
295pub struct IndicesDisplay<'a> {
296    pub indices: &'a [usize],
297    pub schema: &'a Schema,
298}
299
300impl<'a> IndicesDisplay<'a> {
301    pub fn from_join<'b>(
303        join: &'a generic::Join<impl GenericPlanRef>,
304        input_schema: &'a Schema,
305    ) -> Pretty<'b> {
306        let col_num = join.internal_column_num();
307        let id = Self::from(&join.output_indices, col_num, input_schema);
308        id.map_or_else(|| Pretty::from("all"), Self::distill)
309    }
310
311    fn from(indices: &'a [usize], col_num: usize, schema: &'a Schema) -> Option<Self> {
313        if indices.iter().copied().eq(0..col_num) {
314            return None;
315        }
316        Some(Self { indices, schema })
317    }
318
319    pub fn distill<'b>(self) -> Pretty<'b> {
320        let vec = self.indices.iter().map(|&i| {
321            let name = self.schema.fields.get(i).unwrap().name.clone();
322            Pretty::from(name)
323        });
324        Pretty::Array(vec.collect())
325    }
326}
327
328pub(crate) fn sum_affected_row(dml: BatchPlanRef) -> Result<BatchPlanRef> {
329    let dml = RequiredDist::single().batch_enforce_if_not_satisfies(dml, &Order::any())?;
330    let sum_agg = PlanAggCall {
332        agg_type: PbAggKind::Sum.into(),
333        return_type: DataType::Int64,
334        inputs: vec![InputRef::new(0, DataType::Int64)],
335        distinct: false,
336        order_by: vec![],
337        filter: Condition::true_cond(),
338        direct_args: vec![],
339    };
340    let agg = Agg::new(vec![sum_agg], IndexSet::empty(), dml);
341    let batch_agg = BatchSimpleAgg::new(agg);
342    Ok(batch_agg.into())
343}
344
345macro_rules! plan_node_name {
349    ($name:literal $(, { $prop:literal, $cond:expr } )* $(,)?) => {
350        {
351            #[allow(unused_mut)]
352            let mut properties: Vec<&str> = vec![];
353            $( if $cond { properties.push($prop); } )*
354            let mut name = $name.to_string();
355            if !properties.is_empty() {
356                name += " [";
357                name += &properties.join(", ");
358                name += "]";
359            }
360            name
361        }
362    };
363}
364pub(crate) use plan_node_name;
365use risingwave_pb::common::{PbBatchQueryEpoch, batch_query_epoch};
366
367pub fn infer_kv_log_store_table_catalog_inner(
368    input: &StreamPlanRef,
369    columns: &[ColumnCatalog],
370) -> TableCatalog {
371    let mut table_catalog_builder = TableCatalogBuilder::default();
372
373    let mut value_indices =
374        Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + input.schema().fields().len());
375
376    for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
377        let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
378        value_indices.push(indice);
379    }
380
381    table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
382
383    for (i, ordering) in PK_ORDERING.iter().enumerate() {
384        table_catalog_builder.add_order_column(i, *ordering);
385    }
386
387    let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
388
389    if columns.len() != input.schema().fields().len()
390        || columns
391            .iter()
392            .zip_eq_fast(input.schema().fields())
393            .any(|(c, f)| *c.data_type() != f.data_type())
394    {
395        tracing::warn!(
396            "sink schema different with upstream schema: sink columns: {:?}, input schema: {:?}.",
397            columns,
398            input.schema()
399        );
400    }
401    for field in input.schema().fields() {
402        let indice = table_catalog_builder.add_column(field);
403        value_indices.push(indice);
404    }
405    table_catalog_builder.set_value_indices(value_indices);
406
407    let dist_key = input
409        .distribution()
410        .dist_column_indices()
411        .iter()
412        .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
413        .collect_vec();
414
415    table_catalog_builder.build(dist_key, read_prefix_len_hint)
416}
417
418pub fn infer_synced_kv_log_store_table_catalog_inner(
419    input: &StreamPlanRef,
420    columns: &[Field],
421) -> TableCatalog {
422    let mut table_catalog_builder = TableCatalogBuilder::default();
423
424    let mut value_indices =
425        Vec::with_capacity(KV_LOG_STORE_PREDEFINED_COLUMNS.len() + columns.len());
426
427    for (name, data_type) in KV_LOG_STORE_PREDEFINED_COLUMNS {
428        let indice = table_catalog_builder.add_column(&Field::with_name(data_type, name));
429        value_indices.push(indice);
430    }
431
432    table_catalog_builder.set_vnode_col_idx(VNODE_COLUMN_INDEX);
433
434    for (i, ordering) in PK_ORDERING.iter().enumerate() {
435        table_catalog_builder.add_order_column(i, *ordering);
436    }
437
438    let read_prefix_len_hint = table_catalog_builder.get_current_pk_len();
439
440    let payload_indices = {
441        let mut payload_indices = Vec::with_capacity(columns.len());
442        for column in columns {
443            let payload_index = table_catalog_builder.add_column(column);
444            payload_indices.push(payload_index);
445        }
446        payload_indices
447    };
448
449    value_indices.extend(payload_indices);
450    table_catalog_builder.set_value_indices(value_indices);
451
452    let dist_key = input
454        .distribution()
455        .dist_column_indices()
456        .iter()
457        .map(|idx| idx + KV_LOG_STORE_PREDEFINED_COLUMNS.len())
458        .collect_vec();
459
460    table_catalog_builder.build(dist_key, read_prefix_len_hint)
461}
462
463pub(crate) fn plan_can_use_background_ddl(plan: &StreamPlanRef) -> bool {
468    if plan.inputs().is_empty() {
469        if plan.as_stream_source_scan().is_some()
470            || plan.as_stream_now().is_some()
471            || plan.as_stream_source().is_some()
472        {
473            true
474        } else if let Some(scan) = plan.as_stream_table_scan() {
475            scan.stream_scan_type() == StreamScanType::Backfill
476                || scan.stream_scan_type() == StreamScanType::ArrangementBackfill
477                || scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill
478                || scan.stream_scan_type() == StreamScanType::SnapshotBackfill
479        } else {
480            false
481        }
482    } else if plan.as_stream_locality_provider().is_some() {
483        false
486    } else {
487        assert!(!plan.inputs().is_empty());
488        plan.inputs().iter().all(plan_can_use_background_ddl)
489    }
490}
491
492pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch {
493    let ts = ts.checked_add(1).unwrap();
494    risingwave_common::util::epoch::Epoch::from_unix_millis_or_earliest(
495        u64::try_from(ts).unwrap_or(0).checked_mul(1000).unwrap(),
496    )
497}
498
499pub fn to_batch_query_epoch(a: &Option<AsOf>) -> Result<Option<PbBatchQueryEpoch>> {
500    let Some(a) = a else {
501        return Ok(None);
502    };
503    Feature::TimeTravel.check_available()?;
504    let timestamp = match a {
505        AsOf::ProcessTime => {
506            return Err(ErrorCode::NotSupported(
507                "do not support as of proctime".to_owned(),
508                "please use as of timestamp".to_owned(),
509            )
510            .into());
511        }
512        AsOf::TimestampNum(ts) => *ts,
513        AsOf::TimestampString(ts) => {
514            let date_time = speedate::DateTime::parse_str_rfc3339(ts)
515                .map_err(|_e| anyhow!("fail to parse timestamp"))?;
516            if date_time.time.tz_offset.is_none() {
517                risingwave_expr::expr_context::TIME_ZONE::try_with(|set_time_zone| {
519                    let tz =
520                        Timestamptz::lookup_time_zone(set_time_zone).map_err(|e| anyhow!(e))?;
521                    match tz.with_ymd_and_hms(
522                        date_time.date.year.into(),
523                        date_time.date.month.into(),
524                        date_time.date.day.into(),
525                        date_time.time.hour.into(),
526                        date_time.time.minute.into(),
527                        date_time.time.second.into(),
528                    ) {
529                        MappedLocalTime::Single(d) => Ok(d.timestamp()),
530                        MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
531                            Err(anyhow!(format!(
532                                "failed to parse the timestamp {ts} with the specified time zone {tz}"
533                            )))
534                        }
535                    }
536                })??
537            } else {
538                date_time.timestamp_tz()
539            }
540        }
541        AsOf::VersionNum(_) | AsOf::VersionString(_) => {
542            return Err(ErrorCode::NotSupported(
543                "do not support as of version".to_owned(),
544                "please use as of timestamp".to_owned(),
545            )
546            .into());
547        }
548        AsOf::ProcessTimeWithInterval((value, leading_field)) => {
549            let interval = Interval::parse_with_fields(
550                value,
551                Some(crate::Binder::bind_date_time_field(*leading_field)),
552            )
553            .map_err(|_| anyhow!("fail to parse interval"))?;
554            let interval_sec = (interval.epoch_in_micros() / 1_000_000) as i64;
555            chrono::Utc::now()
556                .timestamp()
557                .checked_sub(interval_sec)
558                .ok_or_else(|| anyhow!("invalid timestamp"))?
559        }
560    };
561    Ok(Some(PbBatchQueryEpoch {
562        epoch: Some(batch_query_epoch::PbEpoch::TimeTravel(
563            unix_timestamp_sec_to_epoch(timestamp).0,
564        )),
565    }))
566}
567
568pub fn to_iceberg_time_travel_as_of(
569    a: &Option<AsOf>,
570    timezone: &String,
571) -> Result<Option<IcebergTimeTravelInfo>> {
572    Ok(match a {
573        Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(*v)),
574        Some(AsOf::TimestampNum(ts)) => Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000)),
575        Some(AsOf::VersionString(_)) => {
576            bail!("Unsupported version string in iceberg time travel")
577        }
578        Some(AsOf::TimestampString(ts)) => {
579            let date_time = speedate::DateTime::parse_str_rfc3339(ts)
580                .map_err(|_e| anyhow!("fail to parse timestamp"))?;
581            let timestamp = if date_time.time.tz_offset.is_none() {
582                let tz = Timestamptz::lookup_time_zone(timezone).map_err(|e| anyhow!(e))?;
584                match tz.with_ymd_and_hms(
585                    date_time.date.year.into(),
586                    date_time.date.month.into(),
587                    date_time.date.day.into(),
588                    date_time.time.hour.into(),
589                    date_time.time.minute.into(),
590                    date_time.time.second.into(),
591                ) {
592                    MappedLocalTime::Single(d) => Ok(d.timestamp()),
593                    MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => {
594                        Err(anyhow!(format!(
595                            "failed to parse the timestamp {ts} with the specified time zone {tz}"
596                        )))
597                    }
598                }?
599            } else {
600                date_time.timestamp_tz()
601            };
602
603            Some(IcebergTimeTravelInfo::TimestampMs(
604                timestamp * 1000 + date_time.time.microsecond as i64 / 1000,
605            ))
606        }
607        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
608            unreachable!()
609        }
610        None => None,
611    })
612}
613
614pub fn scan_ranges_as_strs(order_names: Vec<String>, scan_ranges: &Vec<ScanRange>) -> Vec<String> {
615    let mut range_strs = vec![];
616
617    let explain_max_range = 20;
618    for scan_range in scan_ranges.iter().take(explain_max_range) {
619        #[expect(clippy::disallowed_methods)]
620        let mut range_str = scan_range
621            .eq_conds
622            .iter()
623            .zip(order_names.iter())
624            .map(|(v, name)| match v {
625                Some(v) => format!("{} = {:?}", name, v),
626                None => format!("{} IS NULL", name),
627            })
628            .collect_vec();
629
630        let len = scan_range.eq_conds.len();
631        if !is_full_range(&scan_range.range) {
632            let bound_range_str = match (&scan_range.range.0, &scan_range.range.1) {
633                (Bound::Unbounded, Bound::Unbounded) => unreachable!(),
634                (Bound::Unbounded, ub) => ub_to_string(&order_names[len..], ub),
635                (lb, Bound::Unbounded) => lb_to_string(&order_names[len..], lb),
636                (lb, ub) => format!(
637                    "{} AND {}",
638                    lb_to_string(&order_names[len..], lb),
639                    ub_to_string(&order_names[len..], ub)
640                ),
641            };
642            range_str.push(bound_range_str);
643        }
644        range_strs.push(range_str.join(" AND "));
645    }
646    if scan_ranges.len() > explain_max_range {
647        range_strs.push("...".to_owned());
648    }
649    range_strs
650}
651
652pub fn ub_to_string(order_names: &[String], ub: &Bound<Vec<Option<ScalarImpl>>>) -> String {
653    match ub {
654        Bound::Included(v) => {
655            let (name, value) = row_to_string(order_names, v);
656            format!("{} <= {}", name, value)
657        }
658        Bound::Excluded(v) => {
659            let (name, value) = row_to_string(order_names, v);
660            format!("{} < {}", name, value)
661        }
662        Bound::Unbounded => unreachable!(),
663    }
664}
665
666pub fn lb_to_string(order_names: &[String], lb: &Bound<Vec<Option<ScalarImpl>>>) -> String {
667    match lb {
668        Bound::Included(v) => {
669            let (name, value) = row_to_string(order_names, v);
670            format!("{} >= {}", name, value)
671        }
672        Bound::Excluded(v) => {
673            let (name, value) = row_to_string(order_names, v);
674            format!("{} > {}", name, value)
675        }
676        Bound::Unbounded => unreachable!(),
677    }
678}
679
680pub fn row_to_string(
681    order_names: &[String],
682    struct_values: &Vec<Option<ScalarImpl>>,
683) -> (String, String) {
684    let mut names = vec![];
685    let mut values = vec![];
686    #[expect(clippy::disallowed_methods)]
687    for (name, value) in order_names.iter().zip(struct_values.iter()) {
688        names.push(name);
689        match value {
690            Some(v) => values.push(format!("{:?}", v)),
691            None => values.push("null".to_owned()),
692        }
693    }
694    if names.len() == 1 {
695        (names[0].clone(), values[0].clone())
696    } else {
697        (
698            format!("({})", names.iter().join(", ")),
699            format!("({})", values.iter().join(", ")),
700        )
701    }
702}