risingwave_frontend/optimizer/plan_node/generic/
project.rsuse std::collections::{BTreeMap, HashMap};
use std::fmt;
use fixedbitset::FixedBitSet;
use pretty_xmlish::{Pretty, StrAssocArr};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::util::iter_util::ZipEqFast;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::{
assert_input_ref, Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprType, ExprVisitor,
FunctionCall, InputRef,
};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> {
if expr.has_subquery() {
return Err("subquery");
}
if expr.has_agg_call() {
return Err("aggregate function");
}
if expr.has_table_function() {
return Err("table function");
}
if expr.has_window_function() {
return Err("window function");
}
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[allow(clippy::manual_non_exhaustive)]
pub struct Project<PlanRef> {
pub exprs: Vec<ExprImpl>,
pub field_names: BTreeMap<usize, String>,
pub input: PlanRef,
_private: (),
}
impl<PlanRef> Project<PlanRef> {
pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
self.exprs = self
.exprs
.iter()
.map(|e| r.rewrite_expr(e.clone()))
.collect();
}
pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
self.exprs.iter().for_each(|e| v.visit_expr(e));
}
}
impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
fn schema(&self) -> Schema {
let o2i = self.o2i_col_mapping();
let exprs = &self.exprs;
let input_schema = self.input.schema();
let ctx = self.ctx();
let fields = exprs
.iter()
.enumerate()
.map(|(i, expr)| {
let (name, sub_fields, type_name) = match o2i.try_map(i) {
Some(input_idx) => {
let mut field = input_schema.fields()[input_idx].clone();
if let Some(name) = self.field_names.get(&i) {
field.name.clone_from(name);
}
(field.name, field.sub_fields, field.type_name)
}
None => match expr {
ExprImpl::InputRef(_) | ExprImpl::Literal(_) => (
format!("{:?}", ExprDisplay { expr, input_schema }),
vec![],
String::new(),
),
_ => {
let name = if let Some(name) = self.field_names.get(&i) {
name.clone()
} else {
format!("$expr{}", ctx.next_expr_display_id())
};
(name, vec![], String::new())
}
},
};
Field::with_struct(expr.return_type(), name, sub_fields, type_name)
})
.collect();
Schema { fields }
}
fn stream_key(&self) -> Option<Vec<usize>> {
let i2o = self.i2o_col_mapping();
self.input
.stream_key()?
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
}
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}
fn functional_dependency(&self) -> FunctionalDependencySet {
let i2o = self.i2o_col_mapping();
i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
}
}
impl<PlanRef: GenericPlanRef> Project<PlanRef> {
pub fn new(exprs: Vec<ExprImpl>, input: PlanRef) -> Self {
for expr in &exprs {
assert_input_ref!(expr, input.schema().fields().len());
check_expr_type(expr)
.map_err(|expr| format!("{expr} should not in Project operator"))
.unwrap();
}
Project {
exprs,
field_names: Default::default(),
input,
_private: (),
}
}
pub fn with_mapping(input: PlanRef, mapping: ColIndexMapping) -> Self {
if mapping.target_size() == 0 {
return Self::new(vec![], input);
};
let mut input_refs = vec![None; mapping.target_size()];
for (src, tar) in mapping.mapping_pairs() {
assert_eq!(input_refs[tar], None);
input_refs[tar] = Some(src);
}
let input_schema = input.schema();
let exprs: Vec<ExprImpl> = input_refs
.into_iter()
.map(|i| i.unwrap())
.map(|i| InputRef::new(i, input_schema.fields()[i].data_type()).into())
.collect();
Self::new(exprs, input)
}
pub fn with_out_fields(input: PlanRef, out_fields: &FixedBitSet) -> Self {
Self::with_out_col_idx(input, out_fields.ones())
}
pub fn with_out_col_idx(input: PlanRef, out_fields: impl Iterator<Item = usize>) -> Self {
let input_schema = input.schema();
let exprs = out_fields
.map(|index| InputRef::new(index, input_schema[index].data_type()).into())
.collect();
Self::new(exprs, input)
}
pub fn with_vnode_col(input: PlanRef, dist_key: &[usize]) -> Self {
let input_fields = input.schema().fields();
let mut new_exprs: Vec<_> = input_fields
.iter()
.enumerate()
.map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into())
.collect();
new_exprs.push(
FunctionCall::new(
ExprType::Vnode,
dist_key
.iter()
.map(|idx| InputRef::new(*idx, input_fields[*idx].data_type()).into())
.collect(),
)
.expect("Vnode function call should be valid here")
.into(),
);
let vnode_expr_idx = new_exprs.len() - 1;
let mut new = Self::new(new_exprs, input);
new.field_names.insert(vnode_expr_idx, "_vnode".to_string());
new
}
pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
(self.exprs, self.input)
}
pub fn fields_pretty<'a>(&self, schema: &Schema) -> StrAssocArr<'a> {
let f = |t| Pretty::debug(&t);
let e = Pretty::Array(self.exprs_for_display(schema).iter().map(f).collect());
vec![("exprs", e)]
}
fn exprs_for_display<'a>(&'a self, schema: &Schema) -> Vec<AliasedExpr<'a>> {
self.exprs
.iter()
.zip_eq_fast(schema.fields().iter())
.map(|(expr, field)| AliasedExpr {
expr: ExprDisplay {
expr,
input_schema: self.input.schema(),
},
alias: {
match expr {
ExprImpl::InputRef(_) | ExprImpl::Literal(_) => None,
_ => Some(field.name.clone()),
}
},
})
.collect()
}
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
let exprs = &self.exprs;
let input_len = self.input.schema().len();
let mut map = vec![None; exprs.len()];
for (i, expr) in exprs.iter().enumerate() {
if let ExprImpl::InputRef(input) = expr {
map[i] = Some(input.index())
}
}
ColIndexMapping::new(map, input_len)
}
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
let exprs = &self.exprs;
let input_len = self.input.schema().len();
let mut map = vec![None; input_len];
for (i, expr) in exprs.iter().enumerate() {
if let ExprImpl::InputRef(input) = expr {
map[input.index()] = Some(i)
}
}
ColIndexMapping::new(map, exprs.len())
}
pub fn is_all_inputref(&self) -> bool {
self.exprs
.iter()
.all(|expr| matches!(expr, ExprImpl::InputRef(_)))
}
pub fn is_identity(&self) -> bool {
self.exprs.len() == self.input.schema().len()
&& self
.exprs
.iter()
.zip_eq_fast(self.input.schema().fields())
.enumerate()
.all(|(i, (expr, field))| {
matches!(expr, ExprImpl::InputRef(input_ref) if **input_ref == InputRef::new(i, field.data_type()))
})
}
pub fn try_as_projection(&self) -> Option<Vec<usize>> {
self.exprs
.iter()
.map(|expr| match expr {
ExprImpl::InputRef(input_ref) => Some(input_ref.index),
_ => None,
})
.collect::<Option<Vec<_>>>()
}
pub(crate) fn likely_produces_noop_updates(&self) -> bool {
struct HasJsonbAccess {
has: bool,
}
impl ExprVisitor for HasJsonbAccess {
fn visit_function_call(&mut self, func_call: &FunctionCall) {
if matches!(
func_call.func_type(),
ExprType::JsonbAccess
| ExprType::JsonbAccessStr
| ExprType::JsonbExtractPath
| ExprType::JsonbExtractPathVariadic
| ExprType::JsonbExtractPathText
| ExprType::JsonbExtractPathTextVariadic
| ExprType::JsonbPathExists
| ExprType::JsonbPathMatch
| ExprType::JsonbPathQueryArray
| ExprType::JsonbPathQueryFirst
) {
self.has = true;
}
}
}
self.exprs.iter().any(|expr| {
let mut visitor = HasJsonbAccess { has: false };
visitor.visit_expr(expr);
visitor.has
})
}
}
#[derive(Default)]
pub struct ProjectBuilder {
exprs: Vec<ExprImpl>,
exprs_index: HashMap<ExprImpl, usize>,
}
impl ProjectBuilder {
pub fn add_expr(&mut self, expr: &ExprImpl) -> std::result::Result<usize, &'static str> {
check_expr_type(expr)?;
if let Some(idx) = self.exprs_index.get(expr) {
Ok(*idx)
} else {
let index = self.exprs.len();
self.exprs.push(expr.clone());
self.exprs_index.insert(expr.clone(), index);
Ok(index)
}
}
pub fn get_expr(&self, index: usize) -> Option<&ExprImpl> {
self.exprs.get(index)
}
pub fn expr_index(&self, expr: &ExprImpl) -> Option<usize> {
check_expr_type(expr).ok()?;
self.exprs_index.get(expr).copied()
}
pub fn build<PlanRef: GenericPlanRef>(self, input: PlanRef) -> Project<PlanRef> {
Project::new(self.exprs, input)
}
pub fn exprs_len(&self) -> usize {
self.exprs.len()
}
}
pub struct AliasedExpr<'a> {
pub expr: ExprDisplay<'a>,
pub alias: Option<String>,
}
impl fmt::Debug for AliasedExpr<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.alias {
Some(alias) => write!(f, "{:?} as {}", self.expr, alias),
None => write!(f, "{:?}", self.expr),
}
}
}