risingwave_frontend/optimizer/plan_node/
stream_temporal_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::plan_common::JoinType;
19use risingwave_pb::stream_plan::TemporalJoinNode;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21use risingwave_sqlparser::ast::AsOf;
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record, watermark_pretty};
25use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, generic};
26use crate::TableCatalog;
27use crate::expr::{Expr, ExprRewriter, ExprVisitor};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::generic::GenericPlanNode;
30use crate::optimizer::plan_node::plan_tree_node::PlanTreeNodeUnary;
31use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
32use crate::optimizer::plan_node::{
33    EqJoinPredicate, EqJoinPredicateDisplay, StreamExchange, StreamTableScan, TryToStreamPb,
34};
35use crate::scheduler::SchedulerResult;
36use crate::stream_fragmenter::BuildFragmentGraphState;
37use crate::utils::ColIndexMappingRewriteExt;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct StreamTemporalJoin {
41    pub base: PlanBase<Stream>,
42    core: generic::Join<PlanRef>,
43    eq_join_predicate: EqJoinPredicate,
44    append_only: bool,
45    is_nested_loop: bool,
46}
47
48impl StreamTemporalJoin {
49    pub fn new(
50        core: generic::Join<PlanRef>,
51        eq_join_predicate: EqJoinPredicate,
52        is_nested_loop: bool,
53    ) -> Self {
54        assert!(core.join_type == JoinType::Inner || core.join_type == JoinType::LeftOuter);
55        let append_only = core.left.append_only();
56        assert!(!is_nested_loop || append_only);
57
58        let right = core.right.clone();
59        let exchange: &StreamExchange = right
60            .as_stream_exchange()
61            .expect("should be a no shuffle stream exchange");
62        assert!(exchange.no_shuffle());
63        let exchange_input = exchange.input();
64        let scan: &StreamTableScan = exchange_input
65            .as_stream_table_scan()
66            .expect("should be a stream table scan");
67        assert!(matches!(scan.core().as_of, Some(AsOf::ProcessTime)));
68
69        let dist = if is_nested_loop {
70            // Use right side distribution directly if it's nested loop temporal join.
71            let r2o = core.r2i_col_mapping().composite(&core.i2o_col_mapping());
72            r2o.rewrite_provided_distribution(core.right.distribution())
73        } else {
74            // Use left side distribution directly if it's hash temporal join.
75            // https://github.com/risingwavelabs/risingwave/pull/19201#discussion_r1824031780
76            let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping());
77            l2o.rewrite_provided_distribution(core.left.distribution())
78        };
79
80        // Use left side watermark directly.
81        let watermark_columns = core
82            .left
83            .watermark_columns()
84            .map_clone(&core.l2i_col_mapping())
85            .map_clone(&core.i2o_col_mapping());
86
87        let columns_monotonicity = core.i2o_col_mapping().rewrite_monotonicity_map(
88            &core
89                .l2i_col_mapping()
90                .rewrite_monotonicity_map(core.left.columns_monotonicity()),
91        );
92
93        let base = PlanBase::new_stream_with_core(
94            &core,
95            dist,
96            append_only,
97            false, // TODO(rc): derive EOWC property from input
98            watermark_columns,
99            columns_monotonicity,
100        );
101
102        Self {
103            base,
104            core,
105            eq_join_predicate,
106            append_only,
107            is_nested_loop,
108        }
109    }
110
111    /// Get join type
112    pub fn join_type(&self) -> JoinType {
113        self.core.join_type
114    }
115
116    pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
117        &self.eq_join_predicate
118    }
119
120    pub fn append_only(&self) -> bool {
121        self.append_only
122    }
123
124    pub fn is_nested_loop(&self) -> bool {
125        self.eq_join_predicate().has_eq()
126    }
127
128    /// Return memo-table catalog and its `pk_indices`.
129    /// (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`)
130    ///
131    /// Write pattern:
132    ///   for each left input row (with insert op), construct the memo table pk and insert the row into the memo table.
133    ///
134    /// Read pattern:
135    ///   for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table.
136    pub fn infer_memo_table_catalog(&self, right_scan: &StreamTableScan) -> TableCatalog {
137        let left_eq_indexes = self.eq_join_predicate.left_eq_indexes();
138        let read_prefix_len_hint = left_eq_indexes.len() + self.left().stream_key().unwrap().len();
139
140        // Build internal table
141        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
142        // Add right table fields
143        let right_scan_schema = right_scan.core().schema();
144        for field in right_scan_schema.fields() {
145            internal_table_catalog_builder.add_column(field);
146        }
147        // Add join_key + left_pk
148        for field in left_eq_indexes
149            .iter()
150            .chain(self.core.left.stream_key().unwrap())
151            .map(|idx| &self.core.left.schema().fields()[*idx])
152        {
153            internal_table_catalog_builder.add_column(field);
154        }
155
156        let mut pk_indices = vec![];
157        pk_indices
158            .extend(right_scan_schema.len()..(right_scan_schema.len() + read_prefix_len_hint));
159        pk_indices.extend(right_scan.stream_key().unwrap());
160
161        pk_indices.iter().for_each(|idx| {
162            internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
163        });
164
165        let dist_key_len = right_scan
166            .core()
167            .distribution_key()
168            .map(|keys| keys.len())
169            .unwrap_or(0);
170
171        let internal_table_dist_keys =
172            (right_scan_schema.len()..(right_scan_schema.len() + dist_key_len)).collect();
173        internal_table_catalog_builder.build(internal_table_dist_keys, read_prefix_len_hint)
174    }
175}
176
177impl Distill for StreamTemporalJoin {
178    fn distill<'a>(&self) -> XmlNode<'a> {
179        let verbose = self.base.ctx().is_explain_verbose();
180        let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
181        vec.push(("type", Pretty::debug(&self.core.join_type)));
182        vec.push(("append_only", Pretty::debug(&self.append_only)));
183
184        let concat_schema = self.core.concat_schema();
185        vec.push((
186            "predicate",
187            Pretty::debug(&EqJoinPredicateDisplay {
188                eq_join_predicate: self.eq_join_predicate(),
189                input_schema: &concat_schema,
190            }),
191        ));
192
193        vec.push(("nested_loop", Pretty::debug(&self.is_nested_loop)));
194
195        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
196            vec.push(("output_watermarks", ow));
197        }
198
199        if verbose {
200            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
201            vec.push(("output", data));
202        }
203
204        childless_record("StreamTemporalJoin", vec)
205    }
206}
207
208impl PlanTreeNodeBinary for StreamTemporalJoin {
209    fn left(&self) -> PlanRef {
210        self.core.left.clone()
211    }
212
213    fn right(&self) -> PlanRef {
214        self.core.right.clone()
215    }
216
217    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
218        let mut core = self.core.clone();
219        core.left = left;
220        core.right = right;
221        Self::new(core, self.eq_join_predicate.clone(), self.is_nested_loop)
222    }
223}
224
225impl_plan_tree_node_for_binary! { StreamTemporalJoin }
226
227impl TryToStreamPb for StreamTemporalJoin {
228    fn try_to_stream_prost_body(
229        &self,
230        state: &mut BuildFragmentGraphState,
231    ) -> SchedulerResult<NodeBody> {
232        let left_jk_indices = self.eq_join_predicate.left_eq_indexes();
233        let right_jk_indices = self.eq_join_predicate.right_eq_indexes();
234        let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
235        let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
236
237        let null_safe_prost = self.eq_join_predicate.null_safes().into_iter().collect();
238
239        let right = self.right();
240        let exchange: &StreamExchange = right
241            .as_stream_exchange()
242            .expect("should be a no shuffle stream exchange");
243        assert!(exchange.no_shuffle());
244        let exchange_input = exchange.input();
245        let scan: &StreamTableScan = exchange_input
246            .as_stream_table_scan()
247            .expect("should be a stream table scan");
248
249        Ok(NodeBody::TemporalJoin(Box::new(TemporalJoinNode {
250            join_type: self.core.join_type as i32,
251            left_key: left_jk_indices_prost,
252            right_key: right_jk_indices_prost,
253            null_safe: null_safe_prost,
254            condition: self
255                .eq_join_predicate
256                .other_cond()
257                .as_expr_unless_true()
258                .map(|x| x.to_expr_proto()),
259            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
260            table_desc: Some(scan.core().table_desc.try_to_protobuf()?),
261            table_output_indices: scan.core().output_col_idx.iter().map(|&i| i as _).collect(),
262            memo_table: if self.append_only {
263                None
264            } else {
265                let mut memo_table = self.infer_memo_table_catalog(scan);
266                memo_table = memo_table.with_id(state.gen_table_id_wrapped());
267                Some(memo_table.to_internal_table_prost())
268            },
269            is_nested_loop: self.is_nested_loop,
270        })))
271    }
272}
273
274impl ExprRewritable for StreamTemporalJoin {
275    fn has_rewritable_expr(&self) -> bool {
276        true
277    }
278
279    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
280        let mut core = self.core.clone();
281        core.rewrite_exprs(r);
282        Self::new(
283            core,
284            self.eq_join_predicate.rewrite_exprs(r),
285            self.is_nested_loop,
286        )
287        .into()
288    }
289}
290
291impl ExprVisitable for StreamTemporalJoin {
292    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
293        self.core.visit_exprs(v);
294        self.eq_join_predicate.visit_exprs(v);
295    }
296}