risingwave_frontend/optimizer/plan_node/
stream_delta_join.rsuse std::ops::BitAnd;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::ColumnDesc;
use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
use super::stream::prelude::*;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary};
use crate::expr::{Expr, ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb};
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamDeltaJoin {
pub base: PlanBase<Stream>,
core: generic::Join<PlanRef>,
eq_join_predicate: EqJoinPredicate,
}
impl StreamDeltaJoin {
pub fn new(core: generic::Join<PlanRef>, eq_join_predicate: EqJoinPredicate) -> Self {
let append_only = match core.join_type {
JoinType::Inner => core.left.append_only() && core.right.append_only(),
_ => todo!("delta join only supports inner join for now"),
};
if eq_join_predicate.has_non_eq() {
todo!("non-eq condition not supported for delta join");
}
let dist = Distribution::SomeShard;
let watermark_columns = {
let from_left = core
.l2i_col_mapping()
.rewrite_bitset(core.left.watermark_columns());
let from_right = core
.r2i_col_mapping()
.rewrite_bitset(core.right.watermark_columns());
let watermark_columns = from_left.bitand(&from_right);
core.i2o_col_mapping().rewrite_bitset(&watermark_columns)
};
let base = PlanBase::new_stream_with_core(
&core,
dist,
append_only,
false, watermark_columns,
MonotonicityMap::new(), );
Self {
base,
core,
eq_join_predicate,
}
}
pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
&self.eq_join_predicate
}
}
impl Distill for StreamDeltaJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.core.join_type)));
let concat_schema = self.core.concat_schema();
vec.push((
"predicate",
Pretty::debug(&EqJoinPredicateDisplay {
eq_join_predicate: self.eq_join_predicate(),
input_schema: &concat_schema,
}),
));
if verbose {
let data = IndicesDisplay::from_join(&self.core, &concat_schema);
vec.push(("output", data));
}
childless_record("StreamDeltaJoin", vec)
}
}
impl PlanTreeNodeBinary for StreamDeltaJoin {
fn left(&self) -> PlanRef {
self.core.left.clone()
}
fn right(&self) -> PlanRef {
self.core.right.clone()
}
fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
let mut core = self.core.clone();
core.left = left;
core.right = right;
Self::new(core, self.eq_join_predicate.clone())
}
}
impl_plan_tree_node_for_binary! { StreamDeltaJoin }
impl TryToStreamPb for StreamDeltaJoin {
fn try_to_stream_prost_body(
&self,
_state: &mut BuildFragmentGraphState,
) -> SchedulerResult<NodeBody> {
let left = self.left();
let right = self.right();
let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
stream_table_scan.core()
} else {
unreachable!();
};
let left_table_desc = &*left_table.table_desc;
let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
stream_table_scan.core()
} else {
unreachable!();
};
let right_table_desc = &*right_table.table_desc;
let eq_join_predicate = &self.eq_join_predicate;
Ok(NodeBody::DeltaIndexJoin(DeltaIndexJoinNode {
join_type: self.core.join_type as i32,
left_key: eq_join_predicate
.left_eq_indexes()
.iter()
.map(|v| *v as i32)
.collect(),
right_key: eq_join_predicate
.right_eq_indexes()
.iter()
.map(|v| *v as i32)
.collect(),
condition: eq_join_predicate
.other_cond()
.as_expr_unless_true()
.map(|x| x.to_expr_proto()),
left_table_id: left_table_desc.table_id.table_id(),
right_table_id: right_table_desc.table_id.table_id(),
left_info: Some(ArrangementInfo {
arrange_key_orders: left_table_desc.arrange_key_orders_protobuf(),
column_descs: left_table
.column_descs()
.iter()
.map(ColumnDesc::to_protobuf)
.collect(),
table_desc: Some(left_table_desc.try_to_protobuf()?),
output_col_idx: left_table
.output_col_idx
.iter()
.map(|&v| v as u32)
.collect(),
}),
right_info: Some(ArrangementInfo {
arrange_key_orders: right_table_desc.arrange_key_orders_protobuf(),
column_descs: right_table
.column_descs()
.iter()
.map(ColumnDesc::to_protobuf)
.collect(),
table_desc: Some(right_table_desc.try_to_protobuf()?),
output_col_idx: right_table
.output_col_idx
.iter()
.map(|&v| v as u32)
.collect(),
}),
output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
}))
}
}
impl ExprRewritable for StreamDeltaJoin {
fn has_rewritable_expr(&self) -> bool {
true
}
fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
let mut core = self.core.clone();
core.rewrite_exprs(r);
Self::new(core, self.eq_join_predicate.rewrite_exprs(r)).into()
}
}
impl ExprVisitable for StreamDeltaJoin {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
self.core.visit_exprs(v);
}
}