risingwave_frontend/optimizer/plan_node/
logical_intersect.rsuse itertools::Itertools;
use risingwave_common::catalog::Schema;
use super::utils::impl_distill_by_unit;
use super::{
ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream,
};
use crate::error::Result;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::{
generic, ColumnPruningContext, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext,
ToStreamContext,
};
use crate::utils::{ColIndexMapping, Condition};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogicalIntersect {
pub base: PlanBase<Logical>,
core: generic::Intersect<PlanRef>,
}
impl LogicalIntersect {
pub fn new(all: bool, inputs: Vec<PlanRef>) -> Self {
assert!(Schema::all_type_eq(inputs.iter().map(|x| x.schema())));
let core = generic::Intersect { all, inputs };
let base = PlanBase::new_logical_with_core(&core);
LogicalIntersect { base, core }
}
pub fn create(all: bool, inputs: Vec<PlanRef>) -> PlanRef {
LogicalIntersect::new(all, inputs).into()
}
pub fn all(&self) -> bool {
self.core.all
}
}
impl PlanTreeNode for LogicalIntersect {
fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
self.core.inputs.clone().into_iter().collect()
}
fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
Self::new(self.all(), inputs.to_vec()).into()
}
}
impl_distill_by_unit!(LogicalIntersect, core, "LogicalIntersect");
impl ColPrunable for LogicalIntersect {
fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
let new_inputs = self
.inputs()
.iter()
.map(|input| input.prune_col(required_cols, ctx))
.collect_vec();
self.clone_with_inputs(&new_inputs)
}
}
impl ExprRewritable for LogicalIntersect {}
impl ExprVisitable for LogicalIntersect {}
impl PredicatePushdown for LogicalIntersect {
fn predicate_pushdown(
&self,
predicate: Condition,
ctx: &mut PredicatePushdownContext,
) -> PlanRef {
let new_inputs = self
.inputs()
.iter()
.map(|input| input.predicate_pushdown(predicate.clone(), ctx))
.collect_vec();
self.clone_with_inputs(&new_inputs)
}
}
impl ToBatch for LogicalIntersect {
fn to_batch(&self) -> Result<PlanRef> {
unimplemented!()
}
}
impl ToStream for LogicalIntersect {
fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
unimplemented!()
}
fn logical_rewrite_for_stream(
&self,
_ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
unimplemented!()
}
}