risingwave_frontend/optimizer/plan_node/
stream_delta_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::ColumnDesc;
19use risingwave_common::util::functional::SameOrElseExt;
20use risingwave_pb::plan_common::JoinType;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
23
24use super::generic::GenericPlanNode;
25use super::stream::prelude::*;
26use super::utils::{Distill, childless_record};
27use super::{ExprRewritable, PlanBase, PlanTreeNodeBinary, StreamPlanRef as PlanRef, generic};
28use crate::expr::{Expr, ExprRewriter, ExprVisitor};
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::IndicesDisplay;
31use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb};
32use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
33use crate::scheduler::SchedulerResult;
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36/// [`StreamDeltaJoin`] implements [`super::LogicalJoin`] with delta join. It requires its two
37/// inputs to be indexes.
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct StreamDeltaJoin {
40    pub base: PlanBase<Stream>,
41    core: generic::Join<PlanRef>,
42}
43
44impl StreamDeltaJoin {
45    pub fn new(core: generic::Join<PlanRef>) -> Result<Self> {
46        let ctx = core.ctx();
47
48        let eq_join_predicate = core
49            .on
50            .as_eq_predicate_ref()
51            .expect("StreamDeltaJoin requires JoinOn::EqPredicate in core");
52
53        if eq_join_predicate.has_non_eq() {
54            todo!("non-eq condition not supported for delta join");
55        }
56        assert_matches!(
57            core.join_type,
58            JoinType::Inner,
59            "delta join only supports inner join for now"
60        );
61
62        // FIXME: delta join could have arbitrary distribution.
63        let dist = Distribution::SomeShard;
64
65        let watermark_columns = {
66            let from_left = core
67                .left
68                .watermark_columns()
69                .map_clone(&core.l2i_col_mapping());
70            let from_right = core
71                .right
72                .watermark_columns()
73                .map_clone(&core.r2i_col_mapping());
74            let mut res = WatermarkColumns::new();
75            for (idx, l_wtmk_group) in from_left.iter() {
76                if let Some(r_wtmk_group) = from_right.get_group(idx) {
77                    res.insert(
78                        idx,
79                        l_wtmk_group.same_or_else(r_wtmk_group, || ctx.next_watermark_group_id()),
80                    );
81                }
82            }
83            res.map_clone(&core.i2o_col_mapping())
84        };
85
86        // TODO: derive from input
87        let base = PlanBase::new_stream_with_core(
88            &core,
89            dist,
90            core.stream_kind()?,
91            false, // TODO(rc): derive EOWC property from input
92            watermark_columns,
93            MonotonicityMap::new(), // TODO: derive monotonicity
94        );
95
96        Ok(Self { base, core })
97    }
98
99    /// Get a reference to the delta hash join's eq join predicate.
100    pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
101        self.core
102            .on
103            .as_eq_predicate_ref()
104            .expect("StreamDeltaJoin should store predicate as EqJoinPredicate")
105    }
106}
107
108impl Distill for StreamDeltaJoin {
109    fn distill<'a>(&self) -> XmlNode<'a> {
110        let verbose = self.base.ctx().is_explain_verbose();
111        let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
112        vec.push(("type", Pretty::debug(&self.core.join_type)));
113
114        let concat_schema = self.core.concat_schema();
115        vec.push((
116            "predicate",
117            Pretty::debug(&EqJoinPredicateDisplay {
118                eq_join_predicate: self.eq_join_predicate(),
119                input_schema: &concat_schema,
120            }),
121        ));
122
123        if verbose {
124            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
125            vec.push(("output", data));
126        }
127
128        childless_record("StreamDeltaJoin", vec)
129    }
130}
131
132impl PlanTreeNodeBinary<Stream> for StreamDeltaJoin {
133    fn left(&self) -> PlanRef {
134        self.core.left.clone()
135    }
136
137    fn right(&self) -> PlanRef {
138        self.core.right.clone()
139    }
140
141    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
142        let mut core = self.core.clone();
143        core.left = left;
144        core.right = right;
145        Self::new(core).unwrap()
146    }
147}
148
149impl_plan_tree_node_for_binary! { Stream, StreamDeltaJoin }
150
151impl TryToStreamPb for StreamDeltaJoin {
152    fn try_to_stream_prost_body(
153        &self,
154        _state: &mut BuildFragmentGraphState,
155    ) -> SchedulerResult<NodeBody> {
156        let left = self.left();
157        let right = self.right();
158        let retract = left.stream_kind().is_retract() || right.stream_kind().is_retract();
159
160        let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
161            stream_table_scan.core()
162        } else {
163            unreachable!();
164        };
165        let left_table_catalog = &*left_table.table_catalog;
166        let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
167            stream_table_scan.core()
168        } else {
169            unreachable!();
170        };
171        let right_table_catalog = &*right_table.table_catalog;
172
173        // TODO: add a separate delta join node in proto, or move fragmenter to frontend so that we
174        // don't need an intermediate representation.
175        let eq_join_predicate = self.eq_join_predicate();
176        Ok(NodeBody::DeltaIndexJoin(Box::new(DeltaIndexJoinNode {
177            join_type: self.core.join_type as i32,
178            left_key: eq_join_predicate
179                .left_eq_indexes()
180                .iter()
181                .map(|v| *v as i32)
182                .collect(),
183            right_key: eq_join_predicate
184                .right_eq_indexes()
185                .iter()
186                .map(|v| *v as i32)
187                .collect(),
188            condition: eq_join_predicate
189                .other_cond()
190                .as_expr_unless_true()
191                .map(|expr| expr.to_expr_proto_checked_pure(retract, "JOIN condition"))
192                .transpose()?,
193            left_table_id: left_table_catalog.id,
194            right_table_id: right_table_catalog.id,
195            left_info: Some(ArrangementInfo {
196                // TODO: remove it
197                arrange_key_orders: left_table_catalog.arrange_key_orders_protobuf(),
198                // TODO: remove it
199                column_descs: left_table
200                    .column_descs()
201                    .iter()
202                    .map(ColumnDesc::to_protobuf)
203                    .collect(),
204                table_desc: Some(left_table_catalog.table_desc().try_to_protobuf()?),
205                output_col_idx: left_table
206                    .output_col_idx
207                    .iter()
208                    .map(|&v| v as u32)
209                    .collect(),
210            }),
211            right_info: Some(ArrangementInfo {
212                // TODO: remove it
213                arrange_key_orders: right_table_catalog.arrange_key_orders_protobuf(),
214                // TODO: remove it
215                column_descs: right_table
216                    .column_descs()
217                    .iter()
218                    .map(ColumnDesc::to_protobuf)
219                    .collect(),
220                table_desc: Some(right_table_catalog.table_desc().try_to_protobuf()?),
221                output_col_idx: right_table
222                    .output_col_idx
223                    .iter()
224                    .map(|&v| v as u32)
225                    .collect(),
226            }),
227            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
228        })))
229    }
230}
231
232impl ExprRewritable<Stream> for StreamDeltaJoin {
233    fn has_rewritable_expr(&self) -> bool {
234        true
235    }
236
237    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
238        let mut core = self.core.clone();
239        core.rewrite_exprs(r);
240        Self::new(core).unwrap().into()
241    }
242}
243impl ExprVisitable for StreamDeltaJoin {
244    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
245        self.core.visit_exprs(v);
246    }
247}