risingwave_frontend/optimizer/plan_node/
stream_delta_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::ColumnDesc;
19use risingwave_common::util::functional::SameOrElseExt;
20use risingwave_pb::plan_common::JoinType;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
23
24use super::generic::GenericPlanNode;
25use super::stream::prelude::*;
26use super::utils::{Distill, childless_record};
27use super::{ExprRewritable, PlanBase, PlanTreeNodeBinary, StreamPlanRef as PlanRef, generic};
28use crate::expr::{Expr, ExprRewriter, ExprVisitor};
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::IndicesDisplay;
31use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb};
32use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
33use crate::scheduler::SchedulerResult;
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36/// [`StreamDeltaJoin`] implements [`super::LogicalJoin`] with delta join. It requires its two
37/// inputs to be indexes.
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct StreamDeltaJoin {
40    pub base: PlanBase<Stream>,
41    core: generic::Join<PlanRef>,
42
43    /// The join condition must be equivalent to `logical.on`, but separated into equal and
44    /// non-equal parts to facilitate execution later
45    eq_join_predicate: EqJoinPredicate,
46}
47
48impl StreamDeltaJoin {
49    pub fn new(core: generic::Join<PlanRef>, eq_join_predicate: EqJoinPredicate) -> Result<Self> {
50        let ctx = core.ctx();
51
52        if eq_join_predicate.has_non_eq() {
53            todo!("non-eq condition not supported for delta join");
54        }
55        assert_matches!(
56            core.join_type,
57            JoinType::Inner,
58            "delta join only supports inner join for now"
59        );
60
61        // FIXME: delta join could have arbitrary distribution.
62        let dist = Distribution::SomeShard;
63
64        let watermark_columns = {
65            let from_left = core
66                .left
67                .watermark_columns()
68                .map_clone(&core.l2i_col_mapping());
69            let from_right = core
70                .right
71                .watermark_columns()
72                .map_clone(&core.r2i_col_mapping());
73            let mut res = WatermarkColumns::new();
74            for (idx, l_wtmk_group) in from_left.iter() {
75                if let Some(r_wtmk_group) = from_right.get_group(idx) {
76                    res.insert(
77                        idx,
78                        l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
79                    );
80                }
81            }
82            res.map_clone(&core.i2o_col_mapping())
83        };
84
85        // TODO: derive from input
86        let base = PlanBase::new_stream_with_core(
87            &core,
88            dist,
89            core.stream_kind()?,
90            false, // TODO(rc): derive EOWC property from input
91            watermark_columns,
92            MonotonicityMap::new(), // TODO: derive monotonicity
93        );
94
95        Ok(Self {
96            base,
97            core,
98            eq_join_predicate,
99        })
100    }
101
102    /// Get a reference to the delta hash join's eq join predicate.
103    pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
104        &self.eq_join_predicate
105    }
106}
107
108impl Distill for StreamDeltaJoin {
109    fn distill<'a>(&self) -> XmlNode<'a> {
110        let verbose = self.base.ctx().is_explain_verbose();
111        let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
112        vec.push(("type", Pretty::debug(&self.core.join_type)));
113
114        let concat_schema = self.core.concat_schema();
115        vec.push((
116            "predicate",
117            Pretty::debug(&EqJoinPredicateDisplay {
118                eq_join_predicate: self.eq_join_predicate(),
119                input_schema: &concat_schema,
120            }),
121        ));
122
123        if verbose {
124            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
125            vec.push(("output", data));
126        }
127
128        childless_record("StreamDeltaJoin", vec)
129    }
130}
131
132impl PlanTreeNodeBinary<Stream> for StreamDeltaJoin {
133    fn left(&self) -> PlanRef {
134        self.core.left.clone()
135    }
136
137    fn right(&self) -> PlanRef {
138        self.core.right.clone()
139    }
140
141    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
142        let mut core = self.core.clone();
143        core.left = left;
144        core.right = right;
145        Self::new(core, self.eq_join_predicate.clone()).unwrap()
146    }
147}
148
149impl_plan_tree_node_for_binary! { Stream, StreamDeltaJoin }
150
151impl TryToStreamPb for StreamDeltaJoin {
152    fn try_to_stream_prost_body(
153        &self,
154        _state: &mut BuildFragmentGraphState,
155    ) -> SchedulerResult<NodeBody> {
156        let left = self.left();
157        let right = self.right();
158
159        let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
160            stream_table_scan.core()
161        } else {
162            unreachable!();
163        };
164        let left_table_catalog = &*left_table.table_catalog;
165        let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
166            stream_table_scan.core()
167        } else {
168            unreachable!();
169        };
170        let right_table_catalog = &*right_table.table_catalog;
171
172        // TODO: add a separate delta join node in proto, or move fragmenter to frontend so that we
173        // don't need an intermediate representation.
174        let eq_join_predicate = &self.eq_join_predicate;
175        Ok(NodeBody::DeltaIndexJoin(Box::new(DeltaIndexJoinNode {
176            join_type: self.core.join_type as i32,
177            left_key: eq_join_predicate
178                .left_eq_indexes()
179                .iter()
180                .map(|v| *v as i32)
181                .collect(),
182            right_key: eq_join_predicate
183                .right_eq_indexes()
184                .iter()
185                .map(|v| *v as i32)
186                .collect(),
187            condition: eq_join_predicate
188                .other_cond()
189                .as_expr_unless_true()
190                .map(|x| x.to_expr_proto()),
191            left_table_id: left_table_catalog.id.table_id(),
192            right_table_id: right_table_catalog.id.table_id(),
193            left_info: Some(ArrangementInfo {
194                // TODO: remove it
195                arrange_key_orders: left_table_catalog.arrange_key_orders_protobuf(),
196                // TODO: remove it
197                column_descs: left_table
198                    .column_descs()
199                    .iter()
200                    .map(ColumnDesc::to_protobuf)
201                    .collect(),
202                table_desc: Some(left_table_catalog.table_desc().try_to_protobuf()?),
203                output_col_idx: left_table
204                    .output_col_idx
205                    .iter()
206                    .map(|&v| v as u32)
207                    .collect(),
208            }),
209            right_info: Some(ArrangementInfo {
210                // TODO: remove it
211                arrange_key_orders: right_table_catalog.arrange_key_orders_protobuf(),
212                // TODO: remove it
213                column_descs: right_table
214                    .column_descs()
215                    .iter()
216                    .map(ColumnDesc::to_protobuf)
217                    .collect(),
218                table_desc: Some(right_table_catalog.table_desc().try_to_protobuf()?),
219                output_col_idx: right_table
220                    .output_col_idx
221                    .iter()
222                    .map(|&v| v as u32)
223                    .collect(),
224            }),
225            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
226        })))
227    }
228}
229
230impl ExprRewritable<Stream> for StreamDeltaJoin {
231    fn has_rewritable_expr(&self) -> bool {
232        true
233    }
234
235    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
236        let mut core = self.core.clone();
237        core.rewrite_exprs(r);
238        Self::new(core, self.eq_join_predicate.rewrite_exprs(r))
239            .unwrap()
240            .into()
241    }
242}
243impl ExprVisitable for StreamDeltaJoin {
244    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
245        self.core.visit_exprs(v);
246    }
247}