risingwave_frontend/optimizer/plan_node/
derive.rsuse std::collections::HashSet;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema, USER_COLUMN_ID_OFFSET};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use super::PlanRef;
use crate::error::{ErrorCode, Result};
use crate::optimizer::plan_node::generic::PhysicalPlanRef;
use crate::optimizer::property::Order;
pub(crate) fn derive_columns(
input_schema: &Schema,
out_names: Vec<String>,
user_cols: &FixedBitSet,
) -> Result<Vec<ColumnCatalog>> {
let mut col_names = HashSet::new();
for name in &out_names {
if !col_names.insert(name.clone()) {
Err(ErrorCode::InvalidInputSyntax(format!(
"column \"{}\" specified more than once",
name
)))?;
}
}
let mut out_name_iter = out_names.into_iter();
let columns = input_schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
let mut c = ColumnCatalog {
column_desc: ColumnDesc::from_field_with_column_id(
field,
i as i32 + USER_COLUMN_ID_OFFSET,
),
is_hidden: !user_cols.contains(i),
};
c.column_desc.name = if !c.is_hidden {
out_name_iter.next().unwrap()
} else {
let mut name = field.name.clone();
let mut count = 0;
while !col_names.insert(name.clone()) {
count += 1;
name = format!("{}#{}", field.name, count);
}
name
};
c
})
.collect_vec();
assert_eq!(out_name_iter.next(), None);
Ok(columns)
}
pub(crate) fn derive_pk(
input: PlanRef,
user_order_by: Order,
columns: &[ColumnCatalog],
) -> (Vec<ColumnOrder>, Vec<usize>) {
let stream_key = input
.expect_stream_key()
.iter()
.copied()
.unique()
.collect_vec();
let schema = input.schema();
if let Some(name) = columns.iter().map(|c| c.name()).duplicates().next() {
panic!("duplicated column name \"{name}\"");
}
if let Some(id) = columns.iter().map(|c| c.column_id()).duplicates().next() {
panic!("duplicated column ID {id}");
}
assert_eq!(
columns.iter().map(|c| c.data_type().clone()).collect_vec(),
input.schema().data_types()
);
let mut in_order = FixedBitSet::with_capacity(schema.len());
let mut pk = vec![];
let func_dep = input.functional_dependency();
let user_order_by =
func_dep.minimize_order_key(user_order_by, input.distribution().dist_column_indices());
for order in &user_order_by.column_orders {
let idx = order.column_index;
pk.push(order.clone());
in_order.insert(idx);
}
for &idx in &stream_key {
if in_order.contains(idx) {
continue;
}
pk.push(ColumnOrder::new(idx, OrderType::ascending()));
in_order.insert(idx);
}
(pk, stream_key)
}