risingwave_frontend/optimizer/plan_node/
stream_delta_join.rs1use std::assert_matches::assert_matches;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::ColumnDesc;
19use risingwave_common::util::functional::SameOrElseExt;
20use risingwave_pb::plan_common::JoinType;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
23
24use super::generic::GenericPlanNode;
25use super::stream::prelude::*;
26use super::utils::{Distill, childless_record};
27use super::{ExprRewritable, PlanBase, PlanTreeNodeBinary, StreamPlanRef as PlanRef, generic};
28use crate::expr::{Expr, ExprRewriter, ExprVisitor};
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::IndicesDisplay;
31use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb};
32use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
33use crate::scheduler::SchedulerResult;
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct StreamDeltaJoin {
40 pub base: PlanBase<Stream>,
41 core: generic::Join<PlanRef>,
42
43 eq_join_predicate: EqJoinPredicate,
46}
47
48impl StreamDeltaJoin {
49 pub fn new(core: generic::Join<PlanRef>, eq_join_predicate: EqJoinPredicate) -> Result<Self> {
50 let ctx = core.ctx();
51
52 if eq_join_predicate.has_non_eq() {
53 todo!("non-eq condition not supported for delta join");
54 }
55 assert_matches!(
56 core.join_type,
57 JoinType::Inner,
58 "delta join only supports inner join for now"
59 );
60
61 let dist = Distribution::SomeShard;
63
64 let watermark_columns = {
65 let from_left = core
66 .left
67 .watermark_columns()
68 .map_clone(&core.l2i_col_mapping());
69 let from_right = core
70 .right
71 .watermark_columns()
72 .map_clone(&core.r2i_col_mapping());
73 let mut res = WatermarkColumns::new();
74 for (idx, l_wtmk_group) in from_left.iter() {
75 if let Some(r_wtmk_group) = from_right.get_group(idx) {
76 res.insert(
77 idx,
78 l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
79 );
80 }
81 }
82 res.map_clone(&core.i2o_col_mapping())
83 };
84
85 let base = PlanBase::new_stream_with_core(
87 &core,
88 dist,
89 core.stream_kind()?,
90 false, watermark_columns,
92 MonotonicityMap::new(), );
94
95 Ok(Self {
96 base,
97 core,
98 eq_join_predicate,
99 })
100 }
101
102 pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
104 &self.eq_join_predicate
105 }
106}
107
108impl Distill for StreamDeltaJoin {
109 fn distill<'a>(&self) -> XmlNode<'a> {
110 let verbose = self.base.ctx().is_explain_verbose();
111 let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
112 vec.push(("type", Pretty::debug(&self.core.join_type)));
113
114 let concat_schema = self.core.concat_schema();
115 vec.push((
116 "predicate",
117 Pretty::debug(&EqJoinPredicateDisplay {
118 eq_join_predicate: self.eq_join_predicate(),
119 input_schema: &concat_schema,
120 }),
121 ));
122
123 if verbose {
124 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
125 vec.push(("output", data));
126 }
127
128 childless_record("StreamDeltaJoin", vec)
129 }
130}
131
132impl PlanTreeNodeBinary<Stream> for StreamDeltaJoin {
133 fn left(&self) -> PlanRef {
134 self.core.left.clone()
135 }
136
137 fn right(&self) -> PlanRef {
138 self.core.right.clone()
139 }
140
141 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
142 let mut core = self.core.clone();
143 core.left = left;
144 core.right = right;
145 Self::new(core, self.eq_join_predicate.clone()).unwrap()
146 }
147}
148
149impl_plan_tree_node_for_binary! { Stream, StreamDeltaJoin }
150
151impl TryToStreamPb for StreamDeltaJoin {
152 fn try_to_stream_prost_body(
153 &self,
154 _state: &mut BuildFragmentGraphState,
155 ) -> SchedulerResult<NodeBody> {
156 let left = self.left();
157 let right = self.right();
158 let retract = left.stream_kind().is_retract() || right.stream_kind().is_retract();
159
160 let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
161 stream_table_scan.core()
162 } else {
163 unreachable!();
164 };
165 let left_table_catalog = &*left_table.table_catalog;
166 let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
167 stream_table_scan.core()
168 } else {
169 unreachable!();
170 };
171 let right_table_catalog = &*right_table.table_catalog;
172
173 let eq_join_predicate = &self.eq_join_predicate;
176 Ok(NodeBody::DeltaIndexJoin(Box::new(DeltaIndexJoinNode {
177 join_type: self.core.join_type as i32,
178 left_key: eq_join_predicate
179 .left_eq_indexes()
180 .iter()
181 .map(|v| *v as i32)
182 .collect(),
183 right_key: eq_join_predicate
184 .right_eq_indexes()
185 .iter()
186 .map(|v| *v as i32)
187 .collect(),
188 condition: eq_join_predicate
189 .other_cond()
190 .as_expr_unless_true()
191 .map(|expr| expr.to_expr_proto_checked_pure(retract, "JOIN condition"))
192 .transpose()?,
193 left_table_id: left_table_catalog.id,
194 right_table_id: right_table_catalog.id,
195 left_info: Some(ArrangementInfo {
196 arrange_key_orders: left_table_catalog.arrange_key_orders_protobuf(),
198 column_descs: left_table
200 .column_descs()
201 .iter()
202 .map(ColumnDesc::to_protobuf)
203 .collect(),
204 table_desc: Some(left_table_catalog.table_desc().try_to_protobuf()?),
205 output_col_idx: left_table
206 .output_col_idx
207 .iter()
208 .map(|&v| v as u32)
209 .collect(),
210 }),
211 right_info: Some(ArrangementInfo {
212 arrange_key_orders: right_table_catalog.arrange_key_orders_protobuf(),
214 column_descs: right_table
216 .column_descs()
217 .iter()
218 .map(ColumnDesc::to_protobuf)
219 .collect(),
220 table_desc: Some(right_table_catalog.table_desc().try_to_protobuf()?),
221 output_col_idx: right_table
222 .output_col_idx
223 .iter()
224 .map(|&v| v as u32)
225 .collect(),
226 }),
227 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
228 })))
229 }
230}
231
232impl ExprRewritable<Stream> for StreamDeltaJoin {
233 fn has_rewritable_expr(&self) -> bool {
234 true
235 }
236
237 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
238 let mut core = self.core.clone();
239 core.rewrite_exprs(r);
240 Self::new(core, self.eq_join_predicate.rewrite_exprs(r))
241 .unwrap()
242 .into()
243 }
244}
245impl ExprVisitable for StreamDeltaJoin {
246 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
247 self.core.visit_exprs(v);
248 }
249}