risingwave_frontend/optimizer/plan_node/
derive.rs1use std::collections::HashSet;
16
17use fixedbitset::FixedBitSet;
18use itertools::Itertools;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema, USER_COLUMN_ID_OFFSET};
20use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
21
22use super::PlanRef;
23use crate::error::{ErrorCode, Result};
24use crate::optimizer::plan_node::generic::PhysicalPlanRef;
25use crate::optimizer::property::Order;
26
27pub(crate) fn derive_columns(
28 input_schema: &Schema,
29 out_names: Vec<String>,
30 user_cols: &FixedBitSet,
31) -> Result<Vec<ColumnCatalog>> {
32 let mut col_names = HashSet::new();
34 for name in &out_names {
35 if !col_names.insert(name.clone()) {
36 Err(ErrorCode::InvalidInputSyntax(format!(
37 "column \"{}\" specified more than once",
38 name
39 )))?;
40 }
41 }
42
43 let mut out_name_iter = out_names.into_iter();
44 let columns = input_schema
45 .fields()
46 .iter()
47 .enumerate()
48 .map(|(i, field)| {
49 let mut c = ColumnCatalog {
50 column_desc: ColumnDesc::from_field_with_column_id(
51 field,
52 i as i32 + USER_COLUMN_ID_OFFSET,
53 ),
54 is_hidden: !user_cols.contains(i),
55 };
56 c.column_desc.name = if !c.is_hidden {
57 out_name_iter.next().unwrap()
58 } else {
59 let mut name = field.name.clone();
60 let mut count = 0;
61
62 while !col_names.insert(name.clone()) {
63 count += 1;
64 name = format!("{}#{}", field.name, count);
65 }
66
67 name
68 };
69 c
70 })
71 .collect_vec();
72
73 assert_eq!(out_name_iter.next(), None);
75
76 Ok(columns)
77}
78
79pub(crate) fn derive_pk(
81 input: PlanRef,
82 user_order_by: Order,
83 columns: &[ColumnCatalog],
84) -> (Vec<ColumnOrder>, Vec<usize>) {
85 let stream_key = input
87 .expect_stream_key()
88 .iter()
89 .copied()
90 .unique()
91 .collect_vec();
92 let schema = input.schema();
93
94 if let Some(name) = columns.iter().map(|c| c.name()).duplicates().next() {
96 panic!("duplicated column name \"{name}\"");
97 }
98 if let Some(id) = columns.iter().map(|c| c.column_id()).duplicates().next() {
99 panic!("duplicated column ID {id}");
100 }
101 assert_eq!(
103 columns.iter().map(|c| c.data_type().clone()).collect_vec(),
104 input.schema().data_types()
105 );
106
107 let mut in_order = FixedBitSet::with_capacity(schema.len());
108 let mut pk = vec![];
109
110 let func_dep = input.functional_dependency();
111 let user_order_by =
112 func_dep.minimize_order_key(user_order_by, input.distribution().dist_column_indices());
113
114 for order in &user_order_by.column_orders {
115 let idx = order.column_index;
116 pk.push(order.clone());
117 in_order.insert(idx);
118 }
119
120 for &idx in &stream_key {
121 if in_order.contains(idx) {
122 continue;
123 }
124 pk.push(ColumnOrder::new(idx, OrderType::ascending()));
125 in_order.insert(idx);
126 }
127
128 (pk, stream_key)
129}