risingwave_frontend/optimizer/plan_node/
stream_delta_join.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::ColumnDesc;
17use risingwave_common::util::functional::SameOrElseExt;
18use risingwave_pb::plan_common::JoinType;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
21
22use super::generic::GenericPlanNode;
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record};
25use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, generic};
26use crate::expr::{Expr, ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::IndicesDisplay;
29use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb};
30use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
31use crate::scheduler::SchedulerResult;
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamDeltaJoin {
38 pub base: PlanBase<Stream>,
39 core: generic::Join<PlanRef>,
40
41 eq_join_predicate: EqJoinPredicate,
44}
45
46impl StreamDeltaJoin {
47 pub fn new(core: generic::Join<PlanRef>, eq_join_predicate: EqJoinPredicate) -> Self {
48 let ctx = core.ctx();
49
50 let append_only = match core.join_type {
52 JoinType::Inner => core.left.append_only() && core.right.append_only(),
53 _ => todo!("delta join only supports inner join for now"),
54 };
55 if eq_join_predicate.has_non_eq() {
56 todo!("non-eq condition not supported for delta join");
57 }
58
59 let dist = Distribution::SomeShard;
61
62 let watermark_columns = {
63 let from_left = core
64 .left
65 .watermark_columns()
66 .map_clone(&core.l2i_col_mapping());
67 let from_right = core
68 .right
69 .watermark_columns()
70 .map_clone(&core.r2i_col_mapping());
71 let mut res = WatermarkColumns::new();
72 for (idx, l_wtmk_group) in from_left.iter() {
73 if let Some(r_wtmk_group) = from_right.get_group(idx) {
74 res.insert(
75 idx,
76 l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
77 );
78 }
79 }
80 res.map_clone(&core.i2o_col_mapping())
81 };
82
83 let base = PlanBase::new_stream_with_core(
85 &core,
86 dist,
87 append_only,
88 false, watermark_columns,
90 MonotonicityMap::new(), );
92
93 Self {
94 base,
95 core,
96 eq_join_predicate,
97 }
98 }
99
100 pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
102 &self.eq_join_predicate
103 }
104}
105
106impl Distill for StreamDeltaJoin {
107 fn distill<'a>(&self) -> XmlNode<'a> {
108 let verbose = self.base.ctx().is_explain_verbose();
109 let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
110 vec.push(("type", Pretty::debug(&self.core.join_type)));
111
112 let concat_schema = self.core.concat_schema();
113 vec.push((
114 "predicate",
115 Pretty::debug(&EqJoinPredicateDisplay {
116 eq_join_predicate: self.eq_join_predicate(),
117 input_schema: &concat_schema,
118 }),
119 ));
120
121 if verbose {
122 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
123 vec.push(("output", data));
124 }
125
126 childless_record("StreamDeltaJoin", vec)
127 }
128}
129
130impl PlanTreeNodeBinary for StreamDeltaJoin {
131 fn left(&self) -> PlanRef {
132 self.core.left.clone()
133 }
134
135 fn right(&self) -> PlanRef {
136 self.core.right.clone()
137 }
138
139 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
140 let mut core = self.core.clone();
141 core.left = left;
142 core.right = right;
143 Self::new(core, self.eq_join_predicate.clone())
144 }
145}
146
147impl_plan_tree_node_for_binary! { StreamDeltaJoin }
148
149impl TryToStreamPb for StreamDeltaJoin {
150 fn try_to_stream_prost_body(
151 &self,
152 _state: &mut BuildFragmentGraphState,
153 ) -> SchedulerResult<NodeBody> {
154 let left = self.left();
155 let right = self.right();
156
157 let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
158 stream_table_scan.core()
159 } else {
160 unreachable!();
161 };
162 let left_table_desc = &*left_table.table_desc;
163 let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
164 stream_table_scan.core()
165 } else {
166 unreachable!();
167 };
168 let right_table_desc = &*right_table.table_desc;
169
170 let eq_join_predicate = &self.eq_join_predicate;
173 Ok(NodeBody::DeltaIndexJoin(Box::new(DeltaIndexJoinNode {
174 join_type: self.core.join_type as i32,
175 left_key: eq_join_predicate
176 .left_eq_indexes()
177 .iter()
178 .map(|v| *v as i32)
179 .collect(),
180 right_key: eq_join_predicate
181 .right_eq_indexes()
182 .iter()
183 .map(|v| *v as i32)
184 .collect(),
185 condition: eq_join_predicate
186 .other_cond()
187 .as_expr_unless_true()
188 .map(|x| x.to_expr_proto()),
189 left_table_id: left_table_desc.table_id.table_id(),
190 right_table_id: right_table_desc.table_id.table_id(),
191 left_info: Some(ArrangementInfo {
192 arrange_key_orders: left_table_desc.arrange_key_orders_protobuf(),
194 column_descs: left_table
196 .column_descs()
197 .iter()
198 .map(ColumnDesc::to_protobuf)
199 .collect(),
200 table_desc: Some(left_table_desc.try_to_protobuf()?),
201 output_col_idx: left_table
202 .output_col_idx
203 .iter()
204 .map(|&v| v as u32)
205 .collect(),
206 }),
207 right_info: Some(ArrangementInfo {
208 arrange_key_orders: right_table_desc.arrange_key_orders_protobuf(),
210 column_descs: right_table
212 .column_descs()
213 .iter()
214 .map(ColumnDesc::to_protobuf)
215 .collect(),
216 table_desc: Some(right_table_desc.try_to_protobuf()?),
217 output_col_idx: right_table
218 .output_col_idx
219 .iter()
220 .map(|&v| v as u32)
221 .collect(),
222 }),
223 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
224 })))
225 }
226}
227
228impl ExprRewritable for StreamDeltaJoin {
229 fn has_rewritable_expr(&self) -> bool {
230 true
231 }
232
233 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
234 let mut core = self.core.clone();
235 core.rewrite_exprs(r);
236 Self::new(core, self.eq_join_predicate.rewrite_exprs(r)).into()
237 }
238}
239impl ExprVisitable for StreamDeltaJoin {
240 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
241 self.core.visit_exprs(v);
242 }
243}