risingwave_frontend/optimizer/plan_node/
stream_temporal_join.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::plan_common::JoinType;
19use risingwave_pb::stream_plan::TemporalJoinNode;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21use risingwave_sqlparser::ast::AsOf;
22
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record, watermark_pretty};
25use super::{ExprRewritable, PlanBase, PlanTreeNodeBinary, StreamPlanRef as PlanRef, generic};
26use crate::TableCatalog;
27use crate::expr::{Expr, ExprRewriter, ExprVisitor};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::generic::GenericPlanNode;
30use crate::optimizer::plan_node::plan_tree_node::PlanTreeNodeUnary;
31use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
32use crate::optimizer::plan_node::{
33    EqJoinPredicate, EqJoinPredicateDisplay, StreamExchange, StreamTableScan, TryToStreamPb,
34};
35use crate::scheduler::SchedulerResult;
36use crate::stream_fragmenter::BuildFragmentGraphState;
37use crate::utils::ColIndexMappingRewriteExt;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct StreamTemporalJoin {
41    pub base: PlanBase<Stream>,
42    core: generic::Join<PlanRef>,
43    append_only: bool,
44    is_nested_loop: bool,
45}
46
47impl StreamTemporalJoin {
48    pub fn new(core: generic::Join<PlanRef>, is_nested_loop: bool) -> Result<Self> {
49        core.on
50            .as_eq_predicate_ref()
51            .expect("StreamTemporalJoin requires JoinOn::EqPredicate in core");
52        assert!(core.join_type == JoinType::Inner || core.join_type == JoinType::LeftOuter);
53        // TODO(kind): theoretically, the impl can handle upsert stream.
54        let stream_kind = reject_upsert_input!(core.left);
55        let append_only = stream_kind.is_append_only();
56        assert!(!is_nested_loop || append_only);
57
58        let right = core.right.clone();
59        let exchange: &StreamExchange = right
60            .as_stream_exchange()
61            .expect("should be a no shuffle stream exchange");
62        assert!(exchange.no_shuffle());
63        let exchange_input = exchange.input();
64        let scan: &StreamTableScan = exchange_input
65            .as_stream_table_scan()
66            .expect("should be a stream table scan");
67        assert!(matches!(scan.core().as_of, Some(AsOf::ProcessTime)));
68
69        let dist = if is_nested_loop {
70            // Use right side distribution directly if it's nested loop temporal join.
71            let r2o = core.r2i_col_mapping().composite(&core.i2o_col_mapping());
72            r2o.rewrite_provided_distribution(core.right.distribution())
73        } else {
74            // Use left side distribution directly if it's hash temporal join.
75            // https://github.com/risingwavelabs/risingwave/pull/19201#discussion_r1824031780
76            let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping());
77            l2o.rewrite_provided_distribution(core.left.distribution())
78        };
79
80        // Use left side watermark directly.
81        let watermark_columns = core
82            .left
83            .watermark_columns()
84            .map_clone(&core.l2i_col_mapping())
85            .map_clone(&core.i2o_col_mapping());
86
87        let columns_monotonicity = core.i2o_col_mapping().rewrite_monotonicity_map(
88            &core
89                .l2i_col_mapping()
90                .rewrite_monotonicity_map(core.left.columns_monotonicity()),
91        );
92
93        let base = PlanBase::new_stream_with_core(
94            &core,
95            dist,
96            stream_kind,
97            false, // TODO(rc): derive EOWC property from input
98            watermark_columns,
99            columns_monotonicity,
100        );
101
102        Ok(Self {
103            base,
104            core,
105            append_only,
106            is_nested_loop,
107        })
108    }
109
110    /// Get join type
111    pub fn join_type(&self) -> JoinType {
112        self.core.join_type
113    }
114
115    pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
116        self.core
117            .on
118            .as_eq_predicate_ref()
119            .expect("StreamTemporalJoin should store predicate as EqJoinPredicate")
120    }
121
122    pub fn append_only(&self) -> bool {
123        self.append_only
124    }
125
126    pub fn is_nested_loop(&self) -> bool {
127        self.is_nested_loop
128    }
129
130    /// Return memo-table catalog and its `pk_indices`.
131    /// (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`)
132    ///
133    /// Write pattern:
134    ///   for each left input row (with insert op), construct the memo table pk and insert the row into the memo table.
135    ///
136    /// Read pattern:
137    ///   for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table.
138    pub fn infer_memo_table_catalog(&self, right_scan: &StreamTableScan) -> TableCatalog {
139        let left_eq_indexes = self.eq_join_predicate().left_eq_indexes();
140        let read_prefix_len_hint = left_eq_indexes.len() + self.left().stream_key().unwrap().len();
141
142        // Build internal table
143        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
144        // Add right table fields
145        let right_scan_schema = right_scan.core().schema();
146        for field in right_scan_schema.fields() {
147            internal_table_catalog_builder.add_column(field);
148        }
149        // Add join_key + left_pk
150        for field in left_eq_indexes
151            .iter()
152            .chain(self.core.left.stream_key().unwrap())
153            .map(|idx| &self.core.left.schema().fields()[*idx])
154        {
155            internal_table_catalog_builder.add_column(field);
156        }
157
158        let mut pk_indices = vec![];
159        pk_indices
160            .extend(right_scan_schema.len()..(right_scan_schema.len() + read_prefix_len_hint));
161        pk_indices.extend(right_scan.stream_key().unwrap());
162
163        pk_indices.iter().for_each(|idx| {
164            internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
165        });
166
167        let dist_key_len = right_scan
168            .core()
169            .distribution_key()
170            .map(|keys| keys.len())
171            .unwrap_or(0);
172
173        let internal_table_dist_keys =
174            (right_scan_schema.len()..(right_scan_schema.len() + dist_key_len)).collect();
175        internal_table_catalog_builder.build(internal_table_dist_keys, read_prefix_len_hint)
176    }
177}
178
179impl Distill for StreamTemporalJoin {
180    fn distill<'a>(&self) -> XmlNode<'a> {
181        let verbose = self.base.ctx().is_explain_verbose();
182        let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
183        vec.push(("type", Pretty::debug(&self.core.join_type)));
184        vec.push(("append_only", Pretty::debug(&self.append_only)));
185
186        let concat_schema = self.core.concat_schema();
187        vec.push((
188            "predicate",
189            Pretty::debug(&EqJoinPredicateDisplay {
190                eq_join_predicate: self.eq_join_predicate(),
191                input_schema: &concat_schema,
192            }),
193        ));
194
195        vec.push(("nested_loop", Pretty::debug(&self.is_nested_loop)));
196
197        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
198            vec.push(("output_watermarks", ow));
199        }
200
201        if verbose {
202            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
203            vec.push(("output", data));
204        }
205
206        childless_record("StreamTemporalJoin", vec)
207    }
208}
209
210impl PlanTreeNodeBinary<Stream> for StreamTemporalJoin {
211    fn left(&self) -> PlanRef {
212        self.core.left.clone()
213    }
214
215    fn right(&self) -> PlanRef {
216        self.core.right.clone()
217    }
218
219    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
220        let mut core = self.core.clone();
221        core.left = left;
222        core.right = right;
223        Self::new(core, self.is_nested_loop).unwrap()
224    }
225}
226
227impl_plan_tree_node_for_binary! { Stream, StreamTemporalJoin }
228
229impl TryToStreamPb for StreamTemporalJoin {
230    fn try_to_stream_prost_body(
231        &self,
232        state: &mut BuildFragmentGraphState,
233    ) -> SchedulerResult<NodeBody> {
234        let left_jk_indices = self.eq_join_predicate().left_eq_indexes();
235        let right_jk_indices = self.eq_join_predicate().right_eq_indexes();
236        let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
237        let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
238
239        let null_safe_prost = self.eq_join_predicate().null_safes().into_iter().collect();
240
241        let right = self.right();
242        let exchange: &StreamExchange = right
243            .as_stream_exchange()
244            .expect("should be a no shuffle stream exchange");
245        assert!(exchange.no_shuffle());
246        let exchange_input = exchange.input();
247        let scan: &StreamTableScan = exchange_input
248            .as_stream_table_scan()
249            .expect("should be a stream table scan");
250
251        Ok(NodeBody::TemporalJoin(Box::new(TemporalJoinNode {
252            join_type: self.core.join_type as i32,
253            left_key: left_jk_indices_prost,
254            right_key: right_jk_indices_prost,
255            null_safe: null_safe_prost,
256            condition: self
257                .eq_join_predicate()
258                .other_cond()
259                .as_expr_unless_true()
260                .map(|expr| {
261                    expr.to_expr_proto_checked_pure(
262                        self.left().stream_kind().is_retract()
263                            || self.right().stream_kind().is_retract(),
264                        "JOIN condition",
265                    )
266                })
267                .transpose()?,
268            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
269            table_desc: Some(scan.core().table_catalog.table_desc().try_to_protobuf()?),
270            table_output_indices: scan.core().output_col_idx.iter().map(|&i| i as _).collect(),
271            memo_table: if self.append_only {
272                None
273            } else {
274                let mut memo_table = self.infer_memo_table_catalog(scan);
275                memo_table = memo_table.with_id(state.gen_table_id_wrapped());
276                Some(memo_table.to_internal_table_prost())
277            },
278            is_nested_loop: self.is_nested_loop,
279        })))
280    }
281}
282
283impl ExprRewritable<Stream> for StreamTemporalJoin {
284    fn has_rewritable_expr(&self) -> bool {
285        true
286    }
287
288    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
289        let mut core = self.core.clone();
290        core.rewrite_exprs(r);
291        Self::new(core, self.is_nested_loop).unwrap().into()
292    }
293}
294
295impl ExprVisitable for StreamTemporalJoin {
296    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
297        self.core.visit_exprs(v);
298    }
299}