risingwave_frontend/optimizer/plan_node/
stream_delta_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::ColumnDesc;
17use risingwave_common::util::functional::SameOrElseExt;
18use risingwave_pb::plan_common::JoinType;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
21
22use super::generic::GenericPlanNode;
23use super::stream::prelude::*;
24use super::utils::{Distill, childless_record};
25use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, generic};
26use crate::expr::{Expr, ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::IndicesDisplay;
29use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb};
30use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
31use crate::scheduler::SchedulerResult;
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34/// [`StreamDeltaJoin`] implements [`super::LogicalJoin`] with delta join. It requires its two
35/// inputs to be indexes.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamDeltaJoin {
38    pub base: PlanBase<Stream>,
39    core: generic::Join<PlanRef>,
40
41    /// The join condition must be equivalent to `logical.on`, but separated into equal and
42    /// non-equal parts to facilitate execution later
43    eq_join_predicate: EqJoinPredicate,
44}
45
46impl StreamDeltaJoin {
47    pub fn new(core: generic::Join<PlanRef>, eq_join_predicate: EqJoinPredicate) -> Self {
48        let ctx = core.ctx();
49
50        // Inner join won't change the append-only behavior of the stream. The rest might.
51        let append_only = match core.join_type {
52            JoinType::Inner => core.left.append_only() && core.right.append_only(),
53            _ => todo!("delta join only supports inner join for now"),
54        };
55        if eq_join_predicate.has_non_eq() {
56            todo!("non-eq condition not supported for delta join");
57        }
58
59        // FIXME: delta join could have arbitrary distribution.
60        let dist = Distribution::SomeShard;
61
62        let watermark_columns = {
63            let from_left = core
64                .left
65                .watermark_columns()
66                .map_clone(&core.l2i_col_mapping());
67            let from_right = core
68                .right
69                .watermark_columns()
70                .map_clone(&core.r2i_col_mapping());
71            let mut res = WatermarkColumns::new();
72            for (idx, l_wtmk_group) in from_left.iter() {
73                if let Some(r_wtmk_group) = from_right.get_group(idx) {
74                    res.insert(
75                        idx,
76                        l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
77                    );
78                }
79            }
80            res.map_clone(&core.i2o_col_mapping())
81        };
82
83        // TODO: derive from input
84        let base = PlanBase::new_stream_with_core(
85            &core,
86            dist,
87            append_only,
88            false, // TODO(rc): derive EOWC property from input
89            watermark_columns,
90            MonotonicityMap::new(), // TODO: derive monotonicity
91        );
92
93        Self {
94            base,
95            core,
96            eq_join_predicate,
97        }
98    }
99
100    /// Get a reference to the delta hash join's eq join predicate.
101    pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
102        &self.eq_join_predicate
103    }
104}
105
106impl Distill for StreamDeltaJoin {
107    fn distill<'a>(&self) -> XmlNode<'a> {
108        let verbose = self.base.ctx().is_explain_verbose();
109        let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
110        vec.push(("type", Pretty::debug(&self.core.join_type)));
111
112        let concat_schema = self.core.concat_schema();
113        vec.push((
114            "predicate",
115            Pretty::debug(&EqJoinPredicateDisplay {
116                eq_join_predicate: self.eq_join_predicate(),
117                input_schema: &concat_schema,
118            }),
119        ));
120
121        if verbose {
122            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
123            vec.push(("output", data));
124        }
125
126        childless_record("StreamDeltaJoin", vec)
127    }
128}
129
130impl PlanTreeNodeBinary for StreamDeltaJoin {
131    fn left(&self) -> PlanRef {
132        self.core.left.clone()
133    }
134
135    fn right(&self) -> PlanRef {
136        self.core.right.clone()
137    }
138
139    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
140        let mut core = self.core.clone();
141        core.left = left;
142        core.right = right;
143        Self::new(core, self.eq_join_predicate.clone())
144    }
145}
146
147impl_plan_tree_node_for_binary! { StreamDeltaJoin }
148
149impl TryToStreamPb for StreamDeltaJoin {
150    fn try_to_stream_prost_body(
151        &self,
152        _state: &mut BuildFragmentGraphState,
153    ) -> SchedulerResult<NodeBody> {
154        let left = self.left();
155        let right = self.right();
156
157        let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
158            stream_table_scan.core()
159        } else {
160            unreachable!();
161        };
162        let left_table_desc = &*left_table.table_desc;
163        let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
164            stream_table_scan.core()
165        } else {
166            unreachable!();
167        };
168        let right_table_desc = &*right_table.table_desc;
169
170        // TODO: add a separate delta join node in proto, or move fragmenter to frontend so that we
171        // don't need an intermediate representation.
172        let eq_join_predicate = &self.eq_join_predicate;
173        Ok(NodeBody::DeltaIndexJoin(Box::new(DeltaIndexJoinNode {
174            join_type: self.core.join_type as i32,
175            left_key: eq_join_predicate
176                .left_eq_indexes()
177                .iter()
178                .map(|v| *v as i32)
179                .collect(),
180            right_key: eq_join_predicate
181                .right_eq_indexes()
182                .iter()
183                .map(|v| *v as i32)
184                .collect(),
185            condition: eq_join_predicate
186                .other_cond()
187                .as_expr_unless_true()
188                .map(|x| x.to_expr_proto()),
189            left_table_id: left_table_desc.table_id.table_id(),
190            right_table_id: right_table_desc.table_id.table_id(),
191            left_info: Some(ArrangementInfo {
192                // TODO: remove it
193                arrange_key_orders: left_table_desc.arrange_key_orders_protobuf(),
194                // TODO: remove it
195                column_descs: left_table
196                    .column_descs()
197                    .iter()
198                    .map(ColumnDesc::to_protobuf)
199                    .collect(),
200                table_desc: Some(left_table_desc.try_to_protobuf()?),
201                output_col_idx: left_table
202                    .output_col_idx
203                    .iter()
204                    .map(|&v| v as u32)
205                    .collect(),
206            }),
207            right_info: Some(ArrangementInfo {
208                // TODO: remove it
209                arrange_key_orders: right_table_desc.arrange_key_orders_protobuf(),
210                // TODO: remove it
211                column_descs: right_table
212                    .column_descs()
213                    .iter()
214                    .map(ColumnDesc::to_protobuf)
215                    .collect(),
216                table_desc: Some(right_table_desc.try_to_protobuf()?),
217                output_col_idx: right_table
218                    .output_col_idx
219                    .iter()
220                    .map(|&v| v as u32)
221                    .collect(),
222            }),
223            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
224        })))
225    }
226}
227
228impl ExprRewritable for StreamDeltaJoin {
229    fn has_rewritable_expr(&self) -> bool {
230        true
231    }
232
233    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
234        let mut core = self.core.clone();
235        core.rewrite_exprs(r);
236        Self::new(core, self.eq_join_predicate.rewrite_exprs(r)).into()
237    }
238}
239impl ExprVisitable for StreamDeltaJoin {
240    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
241        self.core.visit_exprs(v);
242    }
243}