risingwave_frontend/optimizer/plan_node/generic/
project_set.rsuse pretty_xmlish::{Pretty, Str, XmlNode};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter, ExprVisitor};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::batch::BatchPlanRef;
use crate::optimizer::plan_node::utils::childless_record;
use crate::optimizer::property::{FunctionalDependencySet, Order};
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ProjectSet<PlanRef> {
pub select_list: Vec<ExprImpl>,
pub input: PlanRef,
}
impl<PlanRef> ProjectSet<PlanRef> {
pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
self.select_list = self
.select_list
.iter()
.map(|e| r.rewrite_expr(e.clone()))
.collect();
}
pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
self.select_list.iter().for_each(|e| v.visit_expr(e));
}
pub(crate) fn output_len(&self) -> usize {
self.select_list.len() + 1
}
pub fn decompose(self) -> (Vec<ExprImpl>, PlanRef) {
(self.select_list, self.input)
}
}
impl<PlanRef> DistillUnit for ProjectSet<PlanRef> {
fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
let fields = vec![("select_list", Pretty::debug(&self.select_list))];
childless_record(name, fields)
}
}
impl<PlanRef: GenericPlanRef> GenericPlanNode for ProjectSet<PlanRef> {
fn schema(&self) -> Schema {
let input_schema = self.input.schema();
let o2i = self.o2i_col_mapping();
let mut fields = vec![Field::with_name(DataType::Int64, "projected_row_id")];
fields.extend(self.select_list.iter().enumerate().map(|(idx, expr)| {
let idx = idx + 1;
let (name, sub_fields, type_name) = match o2i.try_map(idx) {
Some(input_idx) => {
let field = input_schema.fields()[input_idx].clone();
(field.name, field.sub_fields, field.type_name)
}
None => (
format!("{:?}", ExprDisplay { expr, input_schema }),
vec![],
String::new(),
),
};
Field::with_struct(expr.return_type(), name, sub_fields, type_name)
}));
Schema { fields }
}
fn stream_key(&self) -> Option<Vec<usize>> {
let i2o = self.i2o_col_mapping();
let mut pk = self
.input
.stream_key()?
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
.unwrap_or_default();
pk.push(0);
Some(pk)
}
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}
fn functional_dependency(&self) -> FunctionalDependencySet {
let i2o = self.i2o_col_mapping();
i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
}
}
impl<PlanRef: GenericPlanRef> ProjectSet<PlanRef> {
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
let input_len = self.input.schema().len();
let mut map = vec![None; 1 + self.select_list.len()];
for (i, item) in self.select_list.iter().enumerate() {
if let ExprImpl::InputRef(input) = item {
map[1 + i] = Some(input.index())
}
}
ColIndexMapping::new(map, input_len)
}
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
let input_len = self.input.schema().len();
let mut map = vec![None; input_len];
for (i, item) in self.select_list.iter().enumerate() {
if let ExprImpl::InputRef(input) = item {
map[input.index()] = Some(1 + i)
}
}
ColIndexMapping::new(map, 1 + self.select_list.len())
}
}
impl<PlanRef: BatchPlanRef> ProjectSet<PlanRef> {
pub fn get_out_column_index_order(&self) -> Order {
self.i2o_col_mapping()
.rewrite_provided_order(self.input.order())
}
}