risingwave_frontend/optimizer/plan_node/
stream_asof_join.rs1use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::util::functional::SameOrElseExt;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinType, JoinType};
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21use risingwave_pb::stream_plan::{AsOfJoinNode, PbJoinEncodingType};
22
23use super::stream::prelude::*;
24use super::utils::{
25 Distill, TableCatalogBuilder, childless_record, plan_node_name, watermark_pretty,
26};
27use super::{
28 ExprRewritable, LogicalJoin, PlanBase, PlanTreeNodeBinary, StreamJoinCommon, StreamNode,
29 StreamPlanRef as PlanRef, generic,
30};
31use crate::TableCatalog;
32use crate::expr::{ExprRewriter, ExprVisitor};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::generic::GenericPlanNode;
35use crate::optimizer::plan_node::utils::IndicesDisplay;
36use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
37use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
38use crate::stream_fragmenter::BuildFragmentGraphState;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct StreamAsOfJoin {
43 pub base: PlanBase<Stream>,
44 core: generic::Join<PlanRef>,
45
46 is_append_only: bool,
49
50 inequality_desc: AsOfJoinDesc,
52}
53
54impl StreamAsOfJoin {
55 pub fn new(core: generic::Join<PlanRef>, inequality_desc: AsOfJoinDesc) -> Result<Self> {
56 assert!(core.join_type == JoinType::AsofInner || core.join_type == JoinType::AsofLeftOuter);
57
58 let stream_kind = core.stream_kind()?;
59
60 let dist = StreamJoinCommon::derive_dist(
61 core.left.distribution(),
62 core.right.distribution(),
63 &core,
64 );
65
66 let eq_join_predicate = core
67 .on
68 .as_eq_predicate_ref()
69 .expect("StreamAsOfJoin requires JoinOn::EqPredicate in core");
70
71 let watermark_columns = {
72 let l2i = core.l2i_col_mapping();
73 let r2i = core.r2i_col_mapping();
74 let mut watermark_columns = WatermarkColumns::new();
75 for (left_idx, right_idx) in
76 eq_join_predicate
77 .eq_indexes()
78 .into_iter()
79 .chain(std::iter::once((
80 inequality_desc.left_idx as usize,
81 inequality_desc.right_idx as usize,
82 )))
83 {
84 if let Some(l_wtmk_group) = core.left.watermark_columns().get_group(left_idx)
85 && let Some(r_wtmk_group) = core.right.watermark_columns().get_group(right_idx)
86 {
87 if let Some(internal) = l2i.try_map(left_idx) {
88 watermark_columns.insert(
89 internal,
90 l_wtmk_group.same_or_else(r_wtmk_group, || {
91 core.ctx().next_watermark_group_id()
92 }),
93 );
94 }
95 if let Some(internal) = r2i.try_map(right_idx) {
96 watermark_columns.insert(
97 internal,
98 l_wtmk_group.same_or_else(r_wtmk_group, || {
99 core.ctx().next_watermark_group_id()
100 }),
101 );
102 }
103 }
104 }
105 watermark_columns.map_clone(&core.i2o_col_mapping())
106 };
107
108 let base = PlanBase::new_stream_with_core(
110 &core,
111 dist,
112 stream_kind,
113 false, watermark_columns,
115 MonotonicityMap::new(), );
117
118 Ok(Self {
119 base,
120 core,
121 is_append_only: stream_kind.is_append_only(),
122 inequality_desc,
123 })
124 }
125
126 pub fn join_type(&self) -> JoinType {
128 self.core.join_type
129 }
130
131 pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
133 self.core
134 .on
135 .as_eq_predicate_ref()
136 .expect("StreamAsOfJoin should store predicate as EqJoinPredicate")
137 }
138
139 pub fn derive_dist_key_in_join_key(&self) -> Vec<usize> {
140 let left_dk_indices = self.left().distribution().dist_column_indices().to_vec();
141 let right_dk_indices = self.right().distribution().dist_column_indices().to_vec();
142
143 StreamJoinCommon::get_dist_key_in_join_key(
144 &left_dk_indices,
145 &right_dk_indices,
146 self.eq_join_predicate(),
147 )
148 }
149
150 pub fn infer_internal_table_catalog<I: StreamPlanNodeMetadata>(
152 input: I,
153 join_key_indices: Vec<usize>,
154 dk_indices_in_jk: Vec<usize>,
155 inequality_key_idx: usize,
156 ) -> (TableCatalog, Vec<usize>) {
157 let schema = input.schema();
158
159 let internal_table_dist_keys = dk_indices_in_jk
160 .iter()
161 .map(|idx| join_key_indices[*idx])
162 .collect_vec();
163
164 let join_key_len = join_key_indices.len();
166 let mut pk_indices = join_key_indices;
167
168 let mut deduped_input_pk_indices = vec![];
170 for input_pk_idx in input.stream_key().unwrap() {
171 if !pk_indices.contains(input_pk_idx)
172 && !deduped_input_pk_indices.contains(input_pk_idx)
173 {
174 deduped_input_pk_indices.push(*input_pk_idx);
175 }
176 }
177
178 pk_indices.push(inequality_key_idx);
179 pk_indices.extend(deduped_input_pk_indices.clone());
180
181 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
183 let internal_columns_fields = schema.fields().to_vec();
184
185 internal_columns_fields.iter().for_each(|field| {
186 internal_table_catalog_builder.add_column(field);
187 });
188 pk_indices.iter().for_each(|idx| {
189 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
190 });
191
192 internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);
193
194 (
195 internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
196 deduped_input_pk_indices,
197 )
198 }
199}
200
201impl Distill for StreamAsOfJoin {
202 fn distill<'a>(&self) -> XmlNode<'a> {
203 let (ljk, rjk) = self
204 .eq_join_predicate()
205 .eq_indexes()
206 .first()
207 .cloned()
208 .expect("first join key");
209
210 let name = plan_node_name!("StreamAsOfJoin",
211 { "window", self.left().watermark_columns().contains(ljk) && self.right().watermark_columns().contains(rjk) },
212 { "append_only", self.is_append_only },
213 );
214 let verbose = self.base.ctx().is_explain_verbose();
215 let mut vec = Vec::with_capacity(6);
216 vec.push(("type", Pretty::debug(&self.core.join_type)));
217
218 let concat_schema = self.core.concat_schema();
219 vec.push((
220 "predicate",
221 Pretty::debug(&EqJoinPredicateDisplay {
222 eq_join_predicate: self.eq_join_predicate(),
223 input_schema: &concat_schema,
224 }),
225 ));
226
227 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
228 vec.push(("output_watermarks", ow));
229 }
230
231 if verbose {
232 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
233 vec.push(("output", data));
234 }
235
236 childless_record(name, vec)
237 }
238}
239
240impl PlanTreeNodeBinary<Stream> for StreamAsOfJoin {
241 fn left(&self) -> PlanRef {
242 self.core.left.clone()
243 }
244
245 fn right(&self) -> PlanRef {
246 self.core.right.clone()
247 }
248
249 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
250 let mut core = self.core.clone();
251 core.left = left;
252 core.right = right;
253
254 Self::new(core, self.inequality_desc).unwrap()
255 }
256}
257
258impl_plan_tree_node_for_binary! { Stream, StreamAsOfJoin }
259
260impl StreamNode for StreamAsOfJoin {
261 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
262 let left_jk_indices = self.eq_join_predicate().left_eq_indexes();
263 let right_jk_indices = self.eq_join_predicate().right_eq_indexes();
264 let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
265 let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
266
267 let dk_indices_in_jk = self.derive_dist_key_in_join_key();
268
269 let (left_table, left_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
270 self.left().plan_base(),
271 left_jk_indices,
272 dk_indices_in_jk.clone(),
273 self.inequality_desc.left_idx as usize,
274 );
275 let (right_table, right_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
276 self.right().plan_base(),
277 right_jk_indices,
278 dk_indices_in_jk,
279 self.inequality_desc.right_idx as usize,
280 );
281
282 let left_deduped_input_pk_indices = left_deduped_input_pk_indices
283 .iter()
284 .map(|idx| *idx as u32)
285 .collect_vec();
286
287 let right_deduped_input_pk_indices = right_deduped_input_pk_indices
288 .iter()
289 .map(|idx| *idx as u32)
290 .collect_vec();
291
292 let left_table = left_table.with_id(state.gen_table_id_wrapped());
293 let right_table = right_table.with_id(state.gen_table_id_wrapped());
294
295 let null_safe_prost = self.eq_join_predicate().null_safes().into_iter().collect();
296
297 let asof_join_type = match self.core.join_type {
298 JoinType::AsofInner => AsOfJoinType::Inner,
299 JoinType::AsofLeftOuter => AsOfJoinType::LeftOuter,
300 _ => unreachable!(),
301 };
302
303 NodeBody::AsOfJoin(Box::new(AsOfJoinNode {
304 join_type: asof_join_type.into(),
305 left_key: left_jk_indices_prost,
306 right_key: right_jk_indices_prost,
307 null_safe: null_safe_prost,
308 left_table: Some(left_table.to_internal_table_prost()),
309 right_table: Some(right_table.to_internal_table_prost()),
310 left_deduped_input_pk_indices,
311 right_deduped_input_pk_indices,
312 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
313 asof_desc: Some(self.inequality_desc),
314 #[allow(deprecated)]
316 join_encoding_type: PbJoinEncodingType::Unspecified as _,
317 }))
318 }
319}
320
321impl ExprRewritable<Stream> for StreamAsOfJoin {
322 fn has_rewritable_expr(&self) -> bool {
323 true
324 }
325
326 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
327 let mut core = self.core.clone();
328 core.rewrite_exprs(r);
329 let eq_join_predicate = core
330 .on
331 .as_eq_predicate_ref()
332 .expect("StreamAsOfJoin should store predicate as EqJoinPredicate")
333 .clone();
334 let desc = LogicalJoin::get_inequality_desc_from_predicate(
335 eq_join_predicate.other_cond().clone(),
336 core.left.schema().len(),
337 )
338 .unwrap();
339
340 Self::new(core, desc).unwrap().into()
341 }
342}
343
344impl ExprVisitable for StreamAsOfJoin {
345 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
346 self.core.visit_exprs(v);
347 }
348}