risingwave_frontend/optimizer/plan_node/
stream_delta_join.rs1use std::assert_matches::assert_matches;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::ColumnDesc;
19use risingwave_common::util::functional::SameOrElseExt;
20use risingwave_pb::plan_common::JoinType;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
23
24use super::generic::GenericPlanNode;
25use super::stream::prelude::*;
26use super::utils::{Distill, childless_record};
27use super::{ExprRewritable, PlanBase, PlanTreeNodeBinary, StreamPlanRef as PlanRef, generic};
28use crate::expr::{Expr, ExprRewriter, ExprVisitor};
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::IndicesDisplay;
31use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb};
32use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
33use crate::scheduler::SchedulerResult;
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct StreamDeltaJoin {
40 pub base: PlanBase<Stream>,
41 core: generic::Join<PlanRef>,
42
43 eq_join_predicate: EqJoinPredicate,
46}
47
48impl StreamDeltaJoin {
49 pub fn new(core: generic::Join<PlanRef>, eq_join_predicate: EqJoinPredicate) -> Result<Self> {
50 let ctx = core.ctx();
51
52 if eq_join_predicate.has_non_eq() {
53 todo!("non-eq condition not supported for delta join");
54 }
55 assert_matches!(
56 core.join_type,
57 JoinType::Inner,
58 "delta join only supports inner join for now"
59 );
60
61 let dist = Distribution::SomeShard;
63
64 let watermark_columns = {
65 let from_left = core
66 .left
67 .watermark_columns()
68 .map_clone(&core.l2i_col_mapping());
69 let from_right = core
70 .right
71 .watermark_columns()
72 .map_clone(&core.r2i_col_mapping());
73 let mut res = WatermarkColumns::new();
74 for (idx, l_wtmk_group) in from_left.iter() {
75 if let Some(r_wtmk_group) = from_right.get_group(idx) {
76 res.insert(
77 idx,
78 l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
79 );
80 }
81 }
82 res.map_clone(&core.i2o_col_mapping())
83 };
84
85 let base = PlanBase::new_stream_with_core(
87 &core,
88 dist,
89 core.stream_kind()?,
90 false, watermark_columns,
92 MonotonicityMap::new(), );
94
95 Ok(Self {
96 base,
97 core,
98 eq_join_predicate,
99 })
100 }
101
102 pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
104 &self.eq_join_predicate
105 }
106}
107
108impl Distill for StreamDeltaJoin {
109 fn distill<'a>(&self) -> XmlNode<'a> {
110 let verbose = self.base.ctx().is_explain_verbose();
111 let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
112 vec.push(("type", Pretty::debug(&self.core.join_type)));
113
114 let concat_schema = self.core.concat_schema();
115 vec.push((
116 "predicate",
117 Pretty::debug(&EqJoinPredicateDisplay {
118 eq_join_predicate: self.eq_join_predicate(),
119 input_schema: &concat_schema,
120 }),
121 ));
122
123 if verbose {
124 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
125 vec.push(("output", data));
126 }
127
128 childless_record("StreamDeltaJoin", vec)
129 }
130}
131
132impl PlanTreeNodeBinary<Stream> for StreamDeltaJoin {
133 fn left(&self) -> PlanRef {
134 self.core.left.clone()
135 }
136
137 fn right(&self) -> PlanRef {
138 self.core.right.clone()
139 }
140
141 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
142 let mut core = self.core.clone();
143 core.left = left;
144 core.right = right;
145 Self::new(core, self.eq_join_predicate.clone()).unwrap()
146 }
147}
148
149impl_plan_tree_node_for_binary! { Stream, StreamDeltaJoin }
150
151impl TryToStreamPb for StreamDeltaJoin {
152 fn try_to_stream_prost_body(
153 &self,
154 _state: &mut BuildFragmentGraphState,
155 ) -> SchedulerResult<NodeBody> {
156 let left = self.left();
157 let right = self.right();
158
159 let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
160 stream_table_scan.core()
161 } else {
162 unreachable!();
163 };
164 let left_table_catalog = &*left_table.table_catalog;
165 let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
166 stream_table_scan.core()
167 } else {
168 unreachable!();
169 };
170 let right_table_catalog = &*right_table.table_catalog;
171
172 let eq_join_predicate = &self.eq_join_predicate;
175 Ok(NodeBody::DeltaIndexJoin(Box::new(DeltaIndexJoinNode {
176 join_type: self.core.join_type as i32,
177 left_key: eq_join_predicate
178 .left_eq_indexes()
179 .iter()
180 .map(|v| *v as i32)
181 .collect(),
182 right_key: eq_join_predicate
183 .right_eq_indexes()
184 .iter()
185 .map(|v| *v as i32)
186 .collect(),
187 condition: eq_join_predicate
188 .other_cond()
189 .as_expr_unless_true()
190 .map(|x| x.to_expr_proto()),
191 left_table_id: left_table_catalog.id.table_id(),
192 right_table_id: right_table_catalog.id.table_id(),
193 left_info: Some(ArrangementInfo {
194 arrange_key_orders: left_table_catalog.arrange_key_orders_protobuf(),
196 column_descs: left_table
198 .column_descs()
199 .iter()
200 .map(ColumnDesc::to_protobuf)
201 .collect(),
202 table_desc: Some(left_table_catalog.table_desc().try_to_protobuf()?),
203 output_col_idx: left_table
204 .output_col_idx
205 .iter()
206 .map(|&v| v as u32)
207 .collect(),
208 }),
209 right_info: Some(ArrangementInfo {
210 arrange_key_orders: right_table_catalog.arrange_key_orders_protobuf(),
212 column_descs: right_table
214 .column_descs()
215 .iter()
216 .map(ColumnDesc::to_protobuf)
217 .collect(),
218 table_desc: Some(right_table_catalog.table_desc().try_to_protobuf()?),
219 output_col_idx: right_table
220 .output_col_idx
221 .iter()
222 .map(|&v| v as u32)
223 .collect(),
224 }),
225 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
226 })))
227 }
228}
229
230impl ExprRewritable<Stream> for StreamDeltaJoin {
231 fn has_rewritable_expr(&self) -> bool {
232 true
233 }
234
235 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
236 let mut core = self.core.clone();
237 core.rewrite_exprs(r);
238 Self::new(core, self.eq_join_predicate.rewrite_exprs(r))
239 .unwrap()
240 .into()
241 }
242}
243impl ExprVisitable for StreamDeltaJoin {
244 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
245 self.core.visit_exprs(v);
246 }
247}