risingwave_frontend/optimizer/plan_visitor/
cardinality_visitor.rsuse std::collections::HashSet;
use std::ops::{Mul, Sub};
use risingwave_pb::plan_common::JoinType;
use super::{DefaultBehavior, DefaultValue, PlanVisitor};
use crate::optimizer::plan_node::generic::TopNLimit;
use crate::optimizer::plan_node::{
self, PlanNode, PlanTreeNode, PlanTreeNodeBinary, PlanTreeNodeUnary,
};
use crate::optimizer::plan_visitor::PlanRef;
use crate::optimizer::property::Cardinality;
pub struct CardinalityVisitor;
impl CardinalityVisitor {
fn visit_predicate(
input: &dyn PlanNode,
input_card: Cardinality,
eq_set: HashSet<usize>,
) -> Cardinality {
let unique_keys: Vec<HashSet<_>> = input
.stream_key()
.into_iter()
.map(|s| s.iter().copied().collect())
.collect();
if unique_keys
.iter()
.any(|unique_key| eq_set.is_superset(unique_key))
{
input_card.min(0..=1)
} else {
input_card.min(0..)
}
}
}
impl PlanVisitor for CardinalityVisitor {
type Result = Cardinality;
type DefaultBehavior = impl DefaultBehavior<Self::Result>;
fn default_behavior() -> Self::DefaultBehavior {
DefaultValue
}
fn visit_logical_values(&mut self, plan: &plan_node::LogicalValues) -> Cardinality {
plan.rows().len().into()
}
fn visit_logical_share(&mut self, plan: &plan_node::LogicalShare) -> Cardinality {
self.visit(plan.input())
}
fn visit_logical_dedup(&mut self, plan: &plan_node::LogicalDedup) -> Cardinality {
let input = self.visit(plan.input());
if plan.dedup_cols().is_empty() {
input.min(1)
} else {
input
}
}
fn visit_logical_over_window(&mut self, plan: &super::LogicalOverWindow) -> Self::Result {
self.visit(plan.input())
}
fn visit_logical_agg(&mut self, plan: &plan_node::LogicalAgg) -> Cardinality {
let input = self.visit(plan.input());
if plan.group_key().is_empty() {
input.min(1)
} else {
input.min(1..)
}
}
fn visit_logical_limit(&mut self, plan: &plan_node::LogicalLimit) -> Cardinality {
self.visit(plan.input()).min(plan.limit() as usize)
}
fn visit_logical_max_one_row(&mut self, plan: &plan_node::LogicalMaxOneRow) -> Cardinality {
self.visit(plan.input()).min(1)
}
fn visit_logical_project(&mut self, plan: &plan_node::LogicalProject) -> Cardinality {
self.visit(plan.input())
}
fn visit_logical_top_n(&mut self, plan: &plan_node::LogicalTopN) -> Cardinality {
let input = self.visit(plan.input());
let each_group = match plan.limit_attr() {
TopNLimit::Simple(limit) => input.sub(plan.offset() as usize).min(limit as usize),
TopNLimit::WithTies(limit) => {
assert_eq!(plan.offset(), 0, "ties with offset is not supported yet");
input.min((limit as usize)..)
}
};
if plan.group_key().is_empty() {
each_group
} else {
let group_number = input.min(1..);
each_group
.mul(group_number)
.min(input)
}
}
fn visit_logical_filter(&mut self, plan: &plan_node::LogicalFilter) -> Cardinality {
let eq_set = plan
.predicate()
.collect_input_refs(plan.input().schema().len())
.ones()
.collect();
Self::visit_predicate(&*plan.input(), self.visit(plan.input()), eq_set)
}
fn visit_logical_scan(&mut self, plan: &plan_node::LogicalScan) -> Cardinality {
let eq_set = plan
.predicate()
.collect_input_refs(plan.table_desc().columns.len())
.ones()
.collect();
Self::visit_predicate(plan, plan.table_cardinality(), eq_set)
}
fn visit_logical_union(&mut self, plan: &plan_node::LogicalUnion) -> Cardinality {
let all = plan
.inputs()
.into_iter()
.map(|input| self.visit(input))
.fold(Cardinality::unknown(), std::ops::Add::add);
if plan.all() {
all
} else {
all.min(1..)
}
}
fn visit_logical_join(&mut self, plan: &plan_node::LogicalJoin) -> Cardinality {
let left = self.visit(plan.left());
let right = self.visit(plan.right());
match plan.join_type() {
JoinType::Unspecified => unreachable!(),
JoinType::Inner => left.mul(right.min(0..)),
JoinType::LeftOuter => left.mul(right.max(1).min(1..)),
JoinType::RightOuter => right.mul(left.max(1).min(1..)),
JoinType::LeftSemi | JoinType::LeftAnti => left.min(0..),
JoinType::RightSemi | JoinType::RightAnti => right.min(0..),
JoinType::FullOuter => Cardinality::unknown(),
JoinType::AsofInner => left.mul(right.min(0..=1)),
JoinType::AsofLeftOuter => left,
}
}
fn visit_logical_now(&mut self, plan: &plan_node::LogicalNow) -> Cardinality {
if plan.max_one_row() {
1.into()
} else {
Cardinality::unknown()
}
}
fn visit_logical_expand(&mut self, plan: &plan_node::LogicalExpand) -> Cardinality {
self.visit(plan.input()) * plan.column_subsets().len()
}
}
#[easy_ext::ext(LogicalCardinalityExt)]
pub impl PlanRef {
fn max_one_row(&self) -> bool {
CardinalityVisitor.visit(self.clone()).is_at_most(1)
}
fn row_count(&self) -> Option<usize> {
CardinalityVisitor.visit(self.clone()).get_exact()
}
}