risingwave_frontend/optimizer/plan_node/
stream_asof_join.rs1use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::session_config::join_encoding_type::JoinEncodingType;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinType, JoinType};
20use risingwave_pb::stream_plan::AsOfJoinNode;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22
23use super::stream::prelude::*;
24use super::utils::{
25 Distill, TableCatalogBuilder, childless_record, plan_node_name, watermark_pretty,
26};
27use super::{
28 ExprRewritable, LogicalJoin, PlanBase, PlanTreeNodeBinary, StreamJoinCommon, StreamNode,
29 StreamPlanRef as PlanRef, generic,
30};
31use crate::TableCatalog;
32use crate::expr::{ExprRewriter, ExprVisitor};
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::plan_node::generic::GenericPlanNode;
35use crate::optimizer::plan_node::utils::IndicesDisplay;
36use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
37use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
38use crate::stream_fragmenter::BuildFragmentGraphState;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct StreamAsOfJoin {
43 pub base: PlanBase<Stream>,
44 core: generic::Join<PlanRef>,
45
46 eq_join_predicate: EqJoinPredicate,
49
50 is_append_only: bool,
53
54 inequality_desc: AsOfJoinDesc,
56
57 join_encoding_type: JoinEncodingType,
59}
60
61impl StreamAsOfJoin {
62 pub fn new(
63 core: generic::Join<PlanRef>,
64 eq_join_predicate: EqJoinPredicate,
65 inequality_desc: AsOfJoinDesc,
66 ) -> Result<Self> {
67 let ctx = core.ctx();
68
69 assert!(core.join_type == JoinType::AsofInner || core.join_type == JoinType::AsofLeftOuter);
70
71 let stream_kind = core.stream_kind()?;
72
73 let dist = StreamJoinCommon::derive_dist(
74 core.left.distribution(),
75 core.right.distribution(),
76 &core,
77 );
78
79 let watermark_columns = WatermarkColumns::new();
81
82 let base = PlanBase::new_stream_with_core(
84 &core,
85 dist,
86 stream_kind,
87 false, watermark_columns,
89 MonotonicityMap::new(), );
91
92 Ok(Self {
93 base,
94 core,
95 eq_join_predicate,
96 is_append_only: stream_kind.is_append_only(),
97 inequality_desc,
98 join_encoding_type: ctx.session_ctx().config().streaming_join_encoding(),
99 })
100 }
101
102 pub fn join_type(&self) -> JoinType {
104 self.core.join_type
105 }
106
107 pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
109 &self.eq_join_predicate
110 }
111
112 pub fn derive_dist_key_in_join_key(&self) -> Vec<usize> {
113 let left_dk_indices = self.left().distribution().dist_column_indices().to_vec();
114 let right_dk_indices = self.right().distribution().dist_column_indices().to_vec();
115
116 StreamJoinCommon::get_dist_key_in_join_key(
117 &left_dk_indices,
118 &right_dk_indices,
119 self.eq_join_predicate(),
120 )
121 }
122
123 pub fn infer_internal_table_catalog<I: StreamPlanNodeMetadata>(
125 input: I,
126 join_key_indices: Vec<usize>,
127 dk_indices_in_jk: Vec<usize>,
128 inequality_key_idx: usize,
129 ) -> (TableCatalog, Vec<usize>) {
130 let schema = input.schema();
131
132 let internal_table_dist_keys = dk_indices_in_jk
133 .iter()
134 .map(|idx| join_key_indices[*idx])
135 .collect_vec();
136
137 let join_key_len = join_key_indices.len();
139 let mut pk_indices = join_key_indices;
140
141 let mut deduped_input_pk_indices = vec![];
143 for input_pk_idx in input.stream_key().unwrap() {
144 if !pk_indices.contains(input_pk_idx)
145 && !deduped_input_pk_indices.contains(input_pk_idx)
146 {
147 deduped_input_pk_indices.push(*input_pk_idx);
148 }
149 }
150
151 pk_indices.push(inequality_key_idx);
152 pk_indices.extend(deduped_input_pk_indices.clone());
153
154 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
156 let internal_columns_fields = schema.fields().to_vec();
157
158 internal_columns_fields.iter().for_each(|field| {
159 internal_table_catalog_builder.add_column(field);
160 });
161 pk_indices.iter().for_each(|idx| {
162 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
163 });
164
165 internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
166
167 (
168 internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
169 deduped_input_pk_indices,
170 )
171 }
172}
173
174impl Distill for StreamAsOfJoin {
175 fn distill<'a>(&self) -> XmlNode<'a> {
176 let (ljk, rjk) = self
177 .eq_join_predicate
178 .eq_indexes()
179 .first()
180 .cloned()
181 .expect("first join key");
182
183 let name = plan_node_name!("StreamAsOfJoin",
184 { "window", self.left().watermark_columns().contains(ljk) && self.right().watermark_columns().contains(rjk) },
185 { "append_only", self.is_append_only },
186 );
187 let verbose = self.base.ctx().is_explain_verbose();
188 let mut vec = Vec::with_capacity(6);
189 vec.push(("type", Pretty::debug(&self.core.join_type)));
190
191 let concat_schema = self.core.concat_schema();
192 vec.push((
193 "predicate",
194 Pretty::debug(&EqJoinPredicateDisplay {
195 eq_join_predicate: self.eq_join_predicate(),
196 input_schema: &concat_schema,
197 }),
198 ));
199
200 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
201 vec.push(("output_watermarks", ow));
202 }
203
204 if verbose {
205 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
206 vec.push(("output", data));
207 }
208
209 childless_record(name, vec)
210 }
211}
212
213impl PlanTreeNodeBinary<Stream> for StreamAsOfJoin {
214 fn left(&self) -> PlanRef {
215 self.core.left.clone()
216 }
217
218 fn right(&self) -> PlanRef {
219 self.core.right.clone()
220 }
221
222 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
223 let mut core = self.core.clone();
224 core.left = left;
225 core.right = right;
226
227 Self::new(core, self.eq_join_predicate.clone(), self.inequality_desc).unwrap()
228 }
229}
230
231impl_plan_tree_node_for_binary! { Stream, StreamAsOfJoin }
232
233impl StreamNode for StreamAsOfJoin {
234 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
235 let left_jk_indices = self.eq_join_predicate.left_eq_indexes();
236 let right_jk_indices = self.eq_join_predicate.right_eq_indexes();
237 let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
238 let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
239
240 let dk_indices_in_jk = self.derive_dist_key_in_join_key();
241
242 let (left_table, left_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
243 self.left().plan_base(),
244 left_jk_indices,
245 dk_indices_in_jk.clone(),
246 self.inequality_desc.left_idx as usize,
247 );
248 let (right_table, right_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
249 self.right().plan_base(),
250 right_jk_indices,
251 dk_indices_in_jk,
252 self.inequality_desc.right_idx as usize,
253 );
254
255 let left_deduped_input_pk_indices = left_deduped_input_pk_indices
256 .iter()
257 .map(|idx| *idx as u32)
258 .collect_vec();
259
260 let right_deduped_input_pk_indices = right_deduped_input_pk_indices
261 .iter()
262 .map(|idx| *idx as u32)
263 .collect_vec();
264
265 let left_table = left_table.with_id(state.gen_table_id_wrapped());
266 let right_table = right_table.with_id(state.gen_table_id_wrapped());
267
268 let null_safe_prost = self.eq_join_predicate.null_safes().into_iter().collect();
269
270 let asof_join_type = match self.core.join_type {
271 JoinType::AsofInner => AsOfJoinType::Inner,
272 JoinType::AsofLeftOuter => AsOfJoinType::LeftOuter,
273 _ => unreachable!(),
274 };
275
276 NodeBody::AsOfJoin(Box::new(AsOfJoinNode {
277 join_type: asof_join_type.into(),
278 left_key: left_jk_indices_prost,
279 right_key: right_jk_indices_prost,
280 null_safe: null_safe_prost,
281 left_table: Some(left_table.to_internal_table_prost()),
282 right_table: Some(right_table.to_internal_table_prost()),
283 left_deduped_input_pk_indices,
284 right_deduped_input_pk_indices,
285 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
286 asof_desc: Some(self.inequality_desc),
287 join_encoding_type: self.join_encoding_type as i32,
288 }))
289 }
290}
291
292impl ExprRewritable<Stream> for StreamAsOfJoin {
293 fn has_rewritable_expr(&self) -> bool {
294 true
295 }
296
297 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
298 let mut core = self.core.clone();
299 core.rewrite_exprs(r);
300 let eq_join_predicate = self.eq_join_predicate.rewrite_exprs(r);
301 let desc = LogicalJoin::get_inequality_desc_from_predicate(
302 eq_join_predicate.other_cond().clone(),
303 core.left.schema().len(),
304 )
305 .unwrap();
306
307 Self::new(core, eq_join_predicate, desc).unwrap().into()
308 }
309}
310
311impl ExprVisitable for StreamAsOfJoin {
312 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
313 self.core.visit_exprs(v);
314 }
315}