risingwave_frontend/optimizer/plan_node/
stream_asof_join.rs1use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinType, JoinType};
19use risingwave_pb::stream_plan::AsOfJoinNode;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21
22use super::stream::prelude::*;
23use super::utils::{
24 Distill, TableCatalogBuilder, childless_record, plan_node_name, watermark_pretty,
25};
26use super::{
27 ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamJoinCommon,
28 StreamNode, generic,
29};
30use crate::TableCatalog;
31use crate::expr::{ExprRewriter, ExprVisitor};
32use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
33use crate::optimizer::plan_node::utils::IndicesDisplay;
34use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
35use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
36use crate::stream_fragmenter::BuildFragmentGraphState;
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct StreamAsOfJoin {
41 pub base: PlanBase<Stream>,
42 core: generic::Join<PlanRef>,
43
44 eq_join_predicate: EqJoinPredicate,
47
48 is_append_only: bool,
51
52 inequality_desc: AsOfJoinDesc,
54}
55
56impl StreamAsOfJoin {
57 pub fn new(
58 core: generic::Join<PlanRef>,
59 eq_join_predicate: EqJoinPredicate,
60 inequality_desc: AsOfJoinDesc,
61 ) -> Self {
62 assert!(core.join_type == JoinType::AsofInner || core.join_type == JoinType::AsofLeftOuter);
63
64 let append_only = match core.join_type {
66 JoinType::Inner => core.left.append_only() && core.right.append_only(),
67 _ => false,
68 };
69
70 let dist = StreamJoinCommon::derive_dist(
71 core.left.distribution(),
72 core.right.distribution(),
73 &core,
74 );
75
76 let watermark_columns = WatermarkColumns::new();
78
79 let base = PlanBase::new_stream_with_core(
81 &core,
82 dist,
83 append_only,
84 false, watermark_columns,
86 MonotonicityMap::new(), );
88
89 Self {
90 base,
91 core,
92 eq_join_predicate,
93 is_append_only: append_only,
94 inequality_desc,
95 }
96 }
97
98 pub fn join_type(&self) -> JoinType {
100 self.core.join_type
101 }
102
103 pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
105 &self.eq_join_predicate
106 }
107
108 pub fn derive_dist_key_in_join_key(&self) -> Vec<usize> {
109 let left_dk_indices = self.left().distribution().dist_column_indices().to_vec();
110 let right_dk_indices = self.right().distribution().dist_column_indices().to_vec();
111
112 StreamJoinCommon::get_dist_key_in_join_key(
113 &left_dk_indices,
114 &right_dk_indices,
115 self.eq_join_predicate(),
116 )
117 }
118
119 pub fn infer_internal_table_catalog<I: StreamPlanRef>(
121 input: I,
122 join_key_indices: Vec<usize>,
123 dk_indices_in_jk: Vec<usize>,
124 inequality_key_idx: usize,
125 ) -> (TableCatalog, Vec<usize>) {
126 let schema = input.schema();
127
128 let internal_table_dist_keys = dk_indices_in_jk
129 .iter()
130 .map(|idx| join_key_indices[*idx])
131 .collect_vec();
132
133 let join_key_len = join_key_indices.len();
135 let mut pk_indices = join_key_indices;
136
137 let mut deduped_input_pk_indices = vec![];
139 for input_pk_idx in input.stream_key().unwrap() {
140 if !pk_indices.contains(input_pk_idx)
141 && !deduped_input_pk_indices.contains(input_pk_idx)
142 {
143 deduped_input_pk_indices.push(*input_pk_idx);
144 }
145 }
146
147 pk_indices.push(inequality_key_idx);
148 pk_indices.extend(deduped_input_pk_indices.clone());
149
150 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
152 let internal_columns_fields = schema.fields().to_vec();
153
154 internal_columns_fields.iter().for_each(|field| {
155 internal_table_catalog_builder.add_column(field);
156 });
157 pk_indices.iter().for_each(|idx| {
158 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
159 });
160
161 internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
162
163 (
164 internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
165 deduped_input_pk_indices,
166 )
167 }
168}
169
170impl Distill for StreamAsOfJoin {
171 fn distill<'a>(&self) -> XmlNode<'a> {
172 let (ljk, rjk) = self
173 .eq_join_predicate
174 .eq_indexes()
175 .first()
176 .cloned()
177 .expect("first join key");
178
179 let name = plan_node_name!("StreamAsOfJoin",
180 { "window", self.left().watermark_columns().contains(ljk) && self.right().watermark_columns().contains(rjk) },
181 { "append_only", self.is_append_only },
182 );
183 let verbose = self.base.ctx().is_explain_verbose();
184 let mut vec = Vec::with_capacity(6);
185 vec.push(("type", Pretty::debug(&self.core.join_type)));
186
187 let concat_schema = self.core.concat_schema();
188 vec.push((
189 "predicate",
190 Pretty::debug(&EqJoinPredicateDisplay {
191 eq_join_predicate: self.eq_join_predicate(),
192 input_schema: &concat_schema,
193 }),
194 ));
195
196 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
197 vec.push(("output_watermarks", ow));
198 }
199
200 if verbose {
201 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
202 vec.push(("output", data));
203 }
204
205 childless_record(name, vec)
206 }
207}
208
209impl PlanTreeNodeBinary for StreamAsOfJoin {
210 fn left(&self) -> PlanRef {
211 self.core.left.clone()
212 }
213
214 fn right(&self) -> PlanRef {
215 self.core.right.clone()
216 }
217
218 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
219 let mut core = self.core.clone();
220 core.left = left;
221 core.right = right;
222 Self::new(core, self.eq_join_predicate.clone(), self.inequality_desc)
223 }
224}
225
226impl_plan_tree_node_for_binary! { StreamAsOfJoin }
227
228impl StreamNode for StreamAsOfJoin {
229 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
230 let left_jk_indices = self.eq_join_predicate.left_eq_indexes();
231 let right_jk_indices = self.eq_join_predicate.right_eq_indexes();
232 let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
233 let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
234
235 let dk_indices_in_jk = self.derive_dist_key_in_join_key();
236
237 let (left_table, left_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
238 self.left().plan_base(),
239 left_jk_indices,
240 dk_indices_in_jk.clone(),
241 self.inequality_desc.left_idx as usize,
242 );
243 let (right_table, right_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
244 self.right().plan_base(),
245 right_jk_indices,
246 dk_indices_in_jk,
247 self.inequality_desc.right_idx as usize,
248 );
249
250 let left_deduped_input_pk_indices = left_deduped_input_pk_indices
251 .iter()
252 .map(|idx| *idx as u32)
253 .collect_vec();
254
255 let right_deduped_input_pk_indices = right_deduped_input_pk_indices
256 .iter()
257 .map(|idx| *idx as u32)
258 .collect_vec();
259
260 let left_table = left_table.with_id(state.gen_table_id_wrapped());
261 let right_table = right_table.with_id(state.gen_table_id_wrapped());
262
263 let null_safe_prost = self.eq_join_predicate.null_safes().into_iter().collect();
264
265 let asof_join_type = match self.core.join_type {
266 JoinType::AsofInner => AsOfJoinType::Inner,
267 JoinType::AsofLeftOuter => AsOfJoinType::LeftOuter,
268 _ => unreachable!(),
269 };
270
271 NodeBody::AsOfJoin(Box::new(AsOfJoinNode {
272 join_type: asof_join_type.into(),
273 left_key: left_jk_indices_prost,
274 right_key: right_jk_indices_prost,
275 null_safe: null_safe_prost,
276 left_table: Some(left_table.to_internal_table_prost()),
277 right_table: Some(right_table.to_internal_table_prost()),
278 left_deduped_input_pk_indices,
279 right_deduped_input_pk_indices,
280 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
281 asof_desc: Some(self.inequality_desc),
282 }))
283 }
284}
285
286impl ExprRewritable for StreamAsOfJoin {
287 fn has_rewritable_expr(&self) -> bool {
288 true
289 }
290
291 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
292 let mut core = self.core.clone();
293 core.rewrite_exprs(r);
294 let eq_join_predicate = self.eq_join_predicate.rewrite_exprs(r);
295 let desc = LogicalJoin::get_inequality_desc_from_predicate(
296 eq_join_predicate.other_cond().clone(),
297 core.left.schema().len(),
298 )
299 .unwrap();
300 Self::new(core, eq_join_predicate, desc).into()
301 }
302}
303
304impl ExprVisitable for StreamAsOfJoin {
305 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
306 self.core.visit_exprs(v);
307 }
308}