risingwave_frontend/optimizer/plan_node/
derive.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use fixedbitset::FixedBitSet;
18use itertools::Itertools;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema, USER_COLUMN_ID_OFFSET};
20use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
21
22use super::PlanRef;
23use crate::error::{ErrorCode, Result};
24use crate::optimizer::plan_node::generic::PhysicalPlanRef;
25use crate::optimizer::property::Order;
26
27pub(crate) fn derive_columns(
28    input_schema: &Schema,
29    out_names: Vec<String>,
30    user_cols: &FixedBitSet,
31) -> Result<Vec<ColumnCatalog>> {
32    // Validate and deduplicate column names.
33    let mut col_names = HashSet::new();
34    for name in &out_names {
35        if !col_names.insert(name.clone()) {
36            Err(ErrorCode::InvalidInputSyntax(format!(
37                "column \"{}\" specified more than once",
38                name
39            )))?;
40        }
41    }
42
43    let mut out_name_iter = out_names.into_iter();
44    let columns = input_schema
45        .fields()
46        .iter()
47        .enumerate()
48        .map(|(i, field)| {
49            let mut c = ColumnCatalog {
50                column_desc: ColumnDesc::from_field_with_column_id(
51                    field,
52                    i as i32 + USER_COLUMN_ID_OFFSET,
53                ),
54                is_hidden: !user_cols.contains(i),
55            };
56            c.column_desc.name = if !c.is_hidden {
57                out_name_iter.next().unwrap()
58            } else {
59                let mut name = field.name.clone();
60                let mut count = 0;
61
62                while !col_names.insert(name.clone()) {
63                    count += 1;
64                    name = format!("{}#{}", field.name, count);
65                }
66
67                name
68            };
69            c
70        })
71        .collect_vec();
72
73    // We should use up all of the `out_name`s
74    assert_eq!(out_name_iter.next(), None);
75
76    Ok(columns)
77}
78
79/// Derive the pk and the stream key for tables and sinks.
80pub(crate) fn derive_pk(
81    input: PlanRef,
82    user_order_by: Order,
83    columns: &[ColumnCatalog],
84) -> (Vec<ColumnOrder>, Vec<usize>) {
85    // Note(congyi): avoid pk duplication
86    let stream_key = input
87        .expect_stream_key()
88        .iter()
89        .copied()
90        .unique()
91        .collect_vec();
92    let schema = input.schema();
93
94    // Assert the uniqueness of column names and IDs, including hidden columns.
95    if let Some(name) = columns.iter().map(|c| c.name()).duplicates().next() {
96        panic!("duplicated column name \"{name}\"");
97    }
98    if let Some(id) = columns.iter().map(|c| c.column_id()).duplicates().next() {
99        panic!("duplicated column ID {id}");
100    }
101    // Assert that the schema of given `columns` is correct.
102    assert_eq!(
103        columns.iter().map(|c| c.data_type().clone()).collect_vec(),
104        input.schema().data_types()
105    );
106
107    let mut in_order = FixedBitSet::with_capacity(schema.len());
108    let mut pk = vec![];
109
110    let func_dep = input.functional_dependency();
111    let user_order_by =
112        func_dep.minimize_order_key(user_order_by, input.distribution().dist_column_indices());
113
114    for order in &user_order_by.column_orders {
115        let idx = order.column_index;
116        pk.push(order.clone());
117        in_order.insert(idx);
118    }
119
120    for &idx in &stream_key {
121        if in_order.contains(idx) {
122            continue;
123        }
124        pk.push(ColumnOrder::new(idx, OrderType::ascending()));
125        in_order.insert(idx);
126    }
127
128    (pk, stream_key)
129}