risingwave_frontend/optimizer/plan_node/
stream_asof_join.rs1use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::util::functional::SameOrElseExt;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinType, JoinType};
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21use risingwave_pb::stream_plan::{AsOfJoinNode, PbJoinEncodingType};
22
23use super::stream::prelude::*;
24use super::utils::{
25 Distill, TableCatalogBuilder, childless_record, plan_node_name, watermark_pretty,
26};
27use super::{
28 ExprRewritable, LogicalJoin, PlanBase, PlanTreeNodeBinary, StreamJoinCommon, StreamNode,
29 StreamPlanRef as PlanRef, generic,
30};
31use crate::TableCatalog;
32use crate::expr::{ExprRewriter, ExprVisitor};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::generic::GenericPlanNode;
35use crate::optimizer::plan_node::utils::IndicesDisplay;
36use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
37use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
38use crate::stream_fragmenter::BuildFragmentGraphState;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct StreamAsOfJoin {
43 pub base: PlanBase<Stream>,
44 core: generic::Join<PlanRef>,
45
46 eq_join_predicate: EqJoinPredicate,
49
50 is_append_only: bool,
53
54 inequality_desc: AsOfJoinDesc,
56}
57
58impl StreamAsOfJoin {
59 pub fn new(
60 core: generic::Join<PlanRef>,
61 eq_join_predicate: EqJoinPredicate,
62 inequality_desc: AsOfJoinDesc,
63 ) -> Result<Self> {
64 assert!(core.join_type == JoinType::AsofInner || core.join_type == JoinType::AsofLeftOuter);
65
66 let stream_kind = core.stream_kind()?;
67
68 let dist = StreamJoinCommon::derive_dist(
69 core.left.distribution(),
70 core.right.distribution(),
71 &core,
72 );
73
74 let watermark_columns = {
75 let l2i = core.l2i_col_mapping();
76 let r2i = core.r2i_col_mapping();
77 let mut watermark_columns = WatermarkColumns::new();
78 for (left_idx, right_idx) in
79 eq_join_predicate
80 .eq_indexes()
81 .into_iter()
82 .chain(std::iter::once((
83 inequality_desc.left_idx as usize,
84 inequality_desc.right_idx as usize,
85 )))
86 {
87 if let Some(l_wtmk_group) = core.left.watermark_columns().get_group(left_idx)
88 && let Some(r_wtmk_group) = core.right.watermark_columns().get_group(right_idx)
89 {
90 if let Some(internal) = l2i.try_map(left_idx) {
91 watermark_columns.insert(
92 internal,
93 l_wtmk_group.same_or_else(r_wtmk_group, || {
94 core.ctx().next_watermark_group_id()
95 }),
96 );
97 }
98 if let Some(internal) = r2i.try_map(right_idx) {
99 watermark_columns.insert(
100 internal,
101 l_wtmk_group.same_or_else(r_wtmk_group, || {
102 core.ctx().next_watermark_group_id()
103 }),
104 );
105 }
106 }
107 }
108 watermark_columns.map_clone(&core.i2o_col_mapping())
109 };
110
111 let base = PlanBase::new_stream_with_core(
113 &core,
114 dist,
115 stream_kind,
116 false, watermark_columns,
118 MonotonicityMap::new(), );
120
121 Ok(Self {
122 base,
123 core,
124 eq_join_predicate,
125 is_append_only: stream_kind.is_append_only(),
126 inequality_desc,
127 })
128 }
129
130 pub fn join_type(&self) -> JoinType {
132 self.core.join_type
133 }
134
135 pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
137 &self.eq_join_predicate
138 }
139
140 pub fn derive_dist_key_in_join_key(&self) -> Vec<usize> {
141 let left_dk_indices = self.left().distribution().dist_column_indices().to_vec();
142 let right_dk_indices = self.right().distribution().dist_column_indices().to_vec();
143
144 StreamJoinCommon::get_dist_key_in_join_key(
145 &left_dk_indices,
146 &right_dk_indices,
147 self.eq_join_predicate(),
148 )
149 }
150
151 pub fn infer_internal_table_catalog<I: StreamPlanNodeMetadata>(
153 input: I,
154 join_key_indices: Vec<usize>,
155 dk_indices_in_jk: Vec<usize>,
156 inequality_key_idx: usize,
157 ) -> (TableCatalog, Vec<usize>) {
158 let schema = input.schema();
159
160 let internal_table_dist_keys = dk_indices_in_jk
161 .iter()
162 .map(|idx| join_key_indices[*idx])
163 .collect_vec();
164
165 let join_key_len = join_key_indices.len();
167 let mut pk_indices = join_key_indices;
168
169 let mut deduped_input_pk_indices = vec![];
171 for input_pk_idx in input.stream_key().unwrap() {
172 if !pk_indices.contains(input_pk_idx)
173 && !deduped_input_pk_indices.contains(input_pk_idx)
174 {
175 deduped_input_pk_indices.push(*input_pk_idx);
176 }
177 }
178
179 pk_indices.push(inequality_key_idx);
180 pk_indices.extend(deduped_input_pk_indices.clone());
181
182 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
184 let internal_columns_fields = schema.fields().to_vec();
185
186 internal_columns_fields.iter().for_each(|field| {
187 internal_table_catalog_builder.add_column(field);
188 });
189 pk_indices.iter().for_each(|idx| {
190 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
191 });
192
193 internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);
194
195 (
196 internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
197 deduped_input_pk_indices,
198 )
199 }
200}
201
202impl Distill for StreamAsOfJoin {
203 fn distill<'a>(&self) -> XmlNode<'a> {
204 let (ljk, rjk) = self
205 .eq_join_predicate
206 .eq_indexes()
207 .first()
208 .cloned()
209 .expect("first join key");
210
211 let name = plan_node_name!("StreamAsOfJoin",
212 { "window", self.left().watermark_columns().contains(ljk) && self.right().watermark_columns().contains(rjk) },
213 { "append_only", self.is_append_only },
214 );
215 let verbose = self.base.ctx().is_explain_verbose();
216 let mut vec = Vec::with_capacity(6);
217 vec.push(("type", Pretty::debug(&self.core.join_type)));
218
219 let concat_schema = self.core.concat_schema();
220 vec.push((
221 "predicate",
222 Pretty::debug(&EqJoinPredicateDisplay {
223 eq_join_predicate: self.eq_join_predicate(),
224 input_schema: &concat_schema,
225 }),
226 ));
227
228 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
229 vec.push(("output_watermarks", ow));
230 }
231
232 if verbose {
233 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
234 vec.push(("output", data));
235 }
236
237 childless_record(name, vec)
238 }
239}
240
241impl PlanTreeNodeBinary<Stream> for StreamAsOfJoin {
242 fn left(&self) -> PlanRef {
243 self.core.left.clone()
244 }
245
246 fn right(&self) -> PlanRef {
247 self.core.right.clone()
248 }
249
250 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
251 let mut core = self.core.clone();
252 core.left = left;
253 core.right = right;
254
255 Self::new(core, self.eq_join_predicate.clone(), self.inequality_desc).unwrap()
256 }
257}
258
259impl_plan_tree_node_for_binary! { Stream, StreamAsOfJoin }
260
261impl StreamNode for StreamAsOfJoin {
262 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
263 let left_jk_indices = self.eq_join_predicate.left_eq_indexes();
264 let right_jk_indices = self.eq_join_predicate.right_eq_indexes();
265 let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
266 let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
267
268 let dk_indices_in_jk = self.derive_dist_key_in_join_key();
269
270 let (left_table, left_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
271 self.left().plan_base(),
272 left_jk_indices,
273 dk_indices_in_jk.clone(),
274 self.inequality_desc.left_idx as usize,
275 );
276 let (right_table, right_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
277 self.right().plan_base(),
278 right_jk_indices,
279 dk_indices_in_jk,
280 self.inequality_desc.right_idx as usize,
281 );
282
283 let left_deduped_input_pk_indices = left_deduped_input_pk_indices
284 .iter()
285 .map(|idx| *idx as u32)
286 .collect_vec();
287
288 let right_deduped_input_pk_indices = right_deduped_input_pk_indices
289 .iter()
290 .map(|idx| *idx as u32)
291 .collect_vec();
292
293 let left_table = left_table.with_id(state.gen_table_id_wrapped());
294 let right_table = right_table.with_id(state.gen_table_id_wrapped());
295
296 let null_safe_prost = self.eq_join_predicate.null_safes().into_iter().collect();
297
298 let asof_join_type = match self.core.join_type {
299 JoinType::AsofInner => AsOfJoinType::Inner,
300 JoinType::AsofLeftOuter => AsOfJoinType::LeftOuter,
301 _ => unreachable!(),
302 };
303
304 NodeBody::AsOfJoin(Box::new(AsOfJoinNode {
305 join_type: asof_join_type.into(),
306 left_key: left_jk_indices_prost,
307 right_key: right_jk_indices_prost,
308 null_safe: null_safe_prost,
309 left_table: Some(left_table.to_internal_table_prost()),
310 right_table: Some(right_table.to_internal_table_prost()),
311 left_deduped_input_pk_indices,
312 right_deduped_input_pk_indices,
313 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
314 asof_desc: Some(self.inequality_desc),
315 #[allow(deprecated)]
317 join_encoding_type: PbJoinEncodingType::Unspecified as _,
318 }))
319 }
320}
321
322impl ExprRewritable<Stream> for StreamAsOfJoin {
323 fn has_rewritable_expr(&self) -> bool {
324 true
325 }
326
327 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
328 let mut core = self.core.clone();
329 core.rewrite_exprs(r);
330 let eq_join_predicate = self.eq_join_predicate.rewrite_exprs(r);
331 let desc = LogicalJoin::get_inequality_desc_from_predicate(
332 eq_join_predicate.other_cond().clone(),
333 core.left.schema().len(),
334 )
335 .unwrap();
336
337 Self::new(core, eq_join_predicate, desc).unwrap().into()
338 }
339}
340
341impl ExprVisitable for StreamAsOfJoin {
342 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
343 self.core.visit_exprs(v);
344 }
345}