risingwave_frontend/optimizer/plan_node/
derive.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use fixedbitset::FixedBitSet;
18use itertools::Itertools;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema, USER_COLUMN_ID_OFFSET};
20use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
21
22use super::StreamPlanRef;
23use crate::error::{ErrorCode, Result};
24use crate::optimizer::plan_node::generic::PhysicalPlanRef;
25use crate::optimizer::property::{Order, RequiredDist};
26
27pub(crate) fn derive_columns(
28    input_schema: &Schema,
29    out_names: Vec<String>,
30    user_cols: &FixedBitSet,
31) -> Result<Vec<ColumnCatalog>> {
32    // Validate and deduplicate column names.
33    let mut col_names = HashSet::new();
34    for name in &out_names {
35        if !col_names.insert(name.clone()) {
36            Err(ErrorCode::InvalidInputSyntax(format!(
37                "column \"{}\" specified more than once",
38                name
39            )))?;
40        }
41    }
42
43    let mut out_name_iter = out_names.into_iter();
44    let columns = input_schema
45        .fields()
46        .iter()
47        .enumerate()
48        .map(|(i, field)| {
49            let mut c = ColumnCatalog {
50                column_desc: ColumnDesc::from_field_with_column_id(
51                    field,
52                    i as i32 + USER_COLUMN_ID_OFFSET,
53                ),
54                is_hidden: !user_cols.contains(i),
55            };
56            c.column_desc.name = if !c.is_hidden {
57                out_name_iter.next().unwrap()
58            } else {
59                let mut name = field.name.clone();
60                let mut count = 0;
61
62                while !col_names.insert(name.clone()) {
63                    count += 1;
64                    name = format!("{}#{}", field.name, count);
65                }
66
67                name
68            };
69            c
70        })
71        .collect_vec();
72
73    // We should use up all of the `out_name`s
74    assert_eq!(out_name_iter.next(), None);
75
76    Ok(columns)
77}
78
79/// Derive the pk and the stream key for tables and sinks.
80pub(crate) fn derive_pk(
81    input: StreamPlanRef,
82    user_distributed_by: RequiredDist,
83    user_order_by: Order,
84    columns: &[ColumnCatalog],
85) -> (Vec<ColumnOrder>, Vec<usize>) {
86    // Note(congyi): avoid pk duplication
87
88    // Add distribution key columns to stream key
89    let mut stream_key = match user_distributed_by {
90        RequiredDist::PhysicalDist(distribution) => distribution.dist_column_indices().to_vec(),
91        RequiredDist::ShardByKey(_) | RequiredDist::ShardByExactKey(_) => {
92            unreachable!("Right now, it is not possible to have ShardByKey/ShardByExactKey here")
93        }
94        RequiredDist::AnyShard | RequiredDist::Any => vec![],
95    };
96
97    // Deduplicate: only add keys from input that aren't already in stream_key
98    let mut seen: HashSet<usize> = stream_key.iter().copied().collect();
99    for key in input.expect_stream_key().iter().copied() {
100        if seen.insert(key) {
101            stream_key.push(key);
102        }
103    }
104
105    let schema = input.schema();
106
107    // Assert the uniqueness of column names and IDs, including hidden columns.
108    if let Some(name) = columns.iter().map(|c| c.name()).duplicates().next() {
109        panic!("duplicated column name \"{name}\"");
110    }
111    if let Some(id) = columns.iter().map(|c| c.column_id()).duplicates().next() {
112        panic!("duplicated column ID {id}");
113    }
114    // Assert that the schema of given `columns` is correct.
115    assert_eq!(
116        columns.iter().map(|c| c.data_type().clone()).collect_vec(),
117        input.schema().data_types()
118    );
119
120    let mut in_order = FixedBitSet::with_capacity(schema.len());
121    let mut pk = vec![];
122
123    let func_dep = input.functional_dependency();
124    let user_order_by = func_dep.minimize_order_key(
125        user_order_by,
126        // The plan could be `SomeShard` in some cases. Ignore the requirement on distribution key
127        // when minimizing the order key.
128        input
129            .distribution()
130            .dist_column_indices_opt()
131            .unwrap_or(&[]),
132    );
133
134    for order in &user_order_by.column_orders {
135        let idx = order.column_index;
136        pk.push(*order);
137        in_order.insert(idx);
138    }
139
140    for &idx in &stream_key {
141        if in_order.contains(idx) {
142            continue;
143        }
144        pk.push(ColumnOrder::new(idx, OrderType::ascending()));
145        in_order.insert(idx);
146    }
147
148    (pk, stream_key)
149}