risingwave_frontend/optimizer/plan_node/
stream_temporal_join.rs
1use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::plan_common::JoinType;
19use risingwave_pb::stream_plan::TemporalJoinNode;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21use risingwave_sqlparser::ast::AsOf;
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record, watermark_pretty};
25use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, generic};
26use crate::TableCatalog;
27use crate::expr::{Expr, ExprRewriter, ExprVisitor};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::generic::GenericPlanNode;
30use crate::optimizer::plan_node::plan_tree_node::PlanTreeNodeUnary;
31use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
32use crate::optimizer::plan_node::{
33 EqJoinPredicate, EqJoinPredicateDisplay, StreamExchange, StreamTableScan, TryToStreamPb,
34};
35use crate::scheduler::SchedulerResult;
36use crate::stream_fragmenter::BuildFragmentGraphState;
37use crate::utils::ColIndexMappingRewriteExt;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct StreamTemporalJoin {
41 pub base: PlanBase<Stream>,
42 core: generic::Join<PlanRef>,
43 eq_join_predicate: EqJoinPredicate,
44 append_only: bool,
45 is_nested_loop: bool,
46}
47
48impl StreamTemporalJoin {
49 pub fn new(
50 core: generic::Join<PlanRef>,
51 eq_join_predicate: EqJoinPredicate,
52 is_nested_loop: bool,
53 ) -> Self {
54 assert!(core.join_type == JoinType::Inner || core.join_type == JoinType::LeftOuter);
55 let append_only = core.left.append_only();
56 assert!(!is_nested_loop || append_only);
57
58 let right = core.right.clone();
59 let exchange: &StreamExchange = right
60 .as_stream_exchange()
61 .expect("should be a no shuffle stream exchange");
62 assert!(exchange.no_shuffle());
63 let exchange_input = exchange.input();
64 let scan: &StreamTableScan = exchange_input
65 .as_stream_table_scan()
66 .expect("should be a stream table scan");
67 assert!(matches!(scan.core().as_of, Some(AsOf::ProcessTime)));
68
69 let dist = if is_nested_loop {
70 let r2o = core.r2i_col_mapping().composite(&core.i2o_col_mapping());
72 r2o.rewrite_provided_distribution(core.right.distribution())
73 } else {
74 let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping());
77 l2o.rewrite_provided_distribution(core.left.distribution())
78 };
79
80 let watermark_columns = core
82 .left
83 .watermark_columns()
84 .map_clone(&core.l2i_col_mapping())
85 .map_clone(&core.i2o_col_mapping());
86
87 let columns_monotonicity = core.i2o_col_mapping().rewrite_monotonicity_map(
88 &core
89 .l2i_col_mapping()
90 .rewrite_monotonicity_map(core.left.columns_monotonicity()),
91 );
92
93 let base = PlanBase::new_stream_with_core(
94 &core,
95 dist,
96 append_only,
97 false, watermark_columns,
99 columns_monotonicity,
100 );
101
102 Self {
103 base,
104 core,
105 eq_join_predicate,
106 append_only,
107 is_nested_loop,
108 }
109 }
110
111 pub fn join_type(&self) -> JoinType {
113 self.core.join_type
114 }
115
116 pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
117 &self.eq_join_predicate
118 }
119
120 pub fn append_only(&self) -> bool {
121 self.append_only
122 }
123
124 pub fn is_nested_loop(&self) -> bool {
125 self.eq_join_predicate().has_eq()
126 }
127
128 pub fn infer_memo_table_catalog(&self, right_scan: &StreamTableScan) -> TableCatalog {
137 let left_eq_indexes = self.eq_join_predicate.left_eq_indexes();
138 let read_prefix_len_hint = left_eq_indexes.len() + self.left().stream_key().unwrap().len();
139
140 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
142 let right_scan_schema = right_scan.core().schema();
144 for field in right_scan_schema.fields() {
145 internal_table_catalog_builder.add_column(field);
146 }
147 for field in left_eq_indexes
149 .iter()
150 .chain(self.core.left.stream_key().unwrap())
151 .map(|idx| &self.core.left.schema().fields()[*idx])
152 {
153 internal_table_catalog_builder.add_column(field);
154 }
155
156 let mut pk_indices = vec![];
157 pk_indices
158 .extend(right_scan_schema.len()..(right_scan_schema.len() + read_prefix_len_hint));
159 pk_indices.extend(right_scan.stream_key().unwrap());
160
161 pk_indices.iter().for_each(|idx| {
162 internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
163 });
164
165 let dist_key_len = right_scan
166 .core()
167 .distribution_key()
168 .map(|keys| keys.len())
169 .unwrap_or(0);
170
171 let internal_table_dist_keys =
172 (right_scan_schema.len()..(right_scan_schema.len() + dist_key_len)).collect();
173 internal_table_catalog_builder.build(internal_table_dist_keys, read_prefix_len_hint)
174 }
175}
176
177impl Distill for StreamTemporalJoin {
178 fn distill<'a>(&self) -> XmlNode<'a> {
179 let verbose = self.base.ctx().is_explain_verbose();
180 let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
181 vec.push(("type", Pretty::debug(&self.core.join_type)));
182 vec.push(("append_only", Pretty::debug(&self.append_only)));
183
184 let concat_schema = self.core.concat_schema();
185 vec.push((
186 "predicate",
187 Pretty::debug(&EqJoinPredicateDisplay {
188 eq_join_predicate: self.eq_join_predicate(),
189 input_schema: &concat_schema,
190 }),
191 ));
192
193 vec.push(("nested_loop", Pretty::debug(&self.is_nested_loop)));
194
195 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
196 vec.push(("output_watermarks", ow));
197 }
198
199 if verbose {
200 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
201 vec.push(("output", data));
202 }
203
204 childless_record("StreamTemporalJoin", vec)
205 }
206}
207
208impl PlanTreeNodeBinary for StreamTemporalJoin {
209 fn left(&self) -> PlanRef {
210 self.core.left.clone()
211 }
212
213 fn right(&self) -> PlanRef {
214 self.core.right.clone()
215 }
216
217 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
218 let mut core = self.core.clone();
219 core.left = left;
220 core.right = right;
221 Self::new(core, self.eq_join_predicate.clone(), self.is_nested_loop)
222 }
223}
224
225impl_plan_tree_node_for_binary! { StreamTemporalJoin }
226
227impl TryToStreamPb for StreamTemporalJoin {
228 fn try_to_stream_prost_body(
229 &self,
230 state: &mut BuildFragmentGraphState,
231 ) -> SchedulerResult<NodeBody> {
232 let left_jk_indices = self.eq_join_predicate.left_eq_indexes();
233 let right_jk_indices = self.eq_join_predicate.right_eq_indexes();
234 let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
235 let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
236
237 let null_safe_prost = self.eq_join_predicate.null_safes().into_iter().collect();
238
239 let right = self.right();
240 let exchange: &StreamExchange = right
241 .as_stream_exchange()
242 .expect("should be a no shuffle stream exchange");
243 assert!(exchange.no_shuffle());
244 let exchange_input = exchange.input();
245 let scan: &StreamTableScan = exchange_input
246 .as_stream_table_scan()
247 .expect("should be a stream table scan");
248
249 Ok(NodeBody::TemporalJoin(Box::new(TemporalJoinNode {
250 join_type: self.core.join_type as i32,
251 left_key: left_jk_indices_prost,
252 right_key: right_jk_indices_prost,
253 null_safe: null_safe_prost,
254 condition: self
255 .eq_join_predicate
256 .other_cond()
257 .as_expr_unless_true()
258 .map(|x| x.to_expr_proto()),
259 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
260 table_desc: Some(scan.core().table_desc.try_to_protobuf()?),
261 table_output_indices: scan.core().output_col_idx.iter().map(|&i| i as _).collect(),
262 memo_table: if self.append_only {
263 None
264 } else {
265 let mut memo_table = self.infer_memo_table_catalog(scan);
266 memo_table = memo_table.with_id(state.gen_table_id_wrapped());
267 Some(memo_table.to_internal_table_prost())
268 },
269 is_nested_loop: self.is_nested_loop,
270 })))
271 }
272}
273
274impl ExprRewritable for StreamTemporalJoin {
275 fn has_rewritable_expr(&self) -> bool {
276 true
277 }
278
279 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
280 let mut core = self.core.clone();
281 core.rewrite_exprs(r);
282 Self::new(
283 core,
284 self.eq_join_predicate.rewrite_exprs(r),
285 self.is_nested_loop,
286 )
287 .into()
288 }
289}
290
291impl ExprVisitable for StreamTemporalJoin {
292 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
293 self.core.visit_exprs(v);
294 self.eq_join_predicate.visit_exprs(v);
295 }
296}