risingwave_frontend/optimizer/plan_node/
stream_asof_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinType, JoinType};
19use risingwave_pb::stream_plan::AsOfJoinNode;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21
22use super::stream::prelude::*;
23use super::utils::{
24    Distill, TableCatalogBuilder, childless_record, plan_node_name, watermark_pretty,
25};
26use super::{
27    ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamJoinCommon,
28    StreamNode, generic,
29};
30use crate::TableCatalog;
31use crate::expr::{ExprRewriter, ExprVisitor};
32use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
33use crate::optimizer::plan_node::utils::IndicesDisplay;
34use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
35use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
36use crate::stream_fragmenter::BuildFragmentGraphState;
37
38/// [`StreamAsOfJoin`] implements [`super::LogicalJoin`] with hash tables.
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct StreamAsOfJoin {
41    pub base: PlanBase<Stream>,
42    core: generic::Join<PlanRef>,
43
44    /// The join condition must be equivalent to `logical.on`, but separated into equal and
45    /// non-equal parts to facilitate execution later
46    eq_join_predicate: EqJoinPredicate,
47
48    /// Whether can optimize for append-only stream.
49    /// It is true if input of both side is append-only
50    is_append_only: bool,
51
52    /// inequality description
53    inequality_desc: AsOfJoinDesc,
54}
55
56impl StreamAsOfJoin {
57    pub fn new(
58        core: generic::Join<PlanRef>,
59        eq_join_predicate: EqJoinPredicate,
60        inequality_desc: AsOfJoinDesc,
61    ) -> Self {
62        assert!(core.join_type == JoinType::AsofInner || core.join_type == JoinType::AsofLeftOuter);
63
64        // Inner join won't change the append-only behavior of the stream. The rest might.
65        let append_only = match core.join_type {
66            JoinType::Inner => core.left.append_only() && core.right.append_only(),
67            _ => false,
68        };
69
70        let dist = StreamJoinCommon::derive_dist(
71            core.left.distribution(),
72            core.right.distribution(),
73            &core,
74        );
75
76        // TODO: derive watermarks
77        let watermark_columns = WatermarkColumns::new();
78
79        // TODO: derive from input
80        let base = PlanBase::new_stream_with_core(
81            &core,
82            dist,
83            append_only,
84            false, // TODO(rc): derive EOWC property from input
85            watermark_columns,
86            MonotonicityMap::new(), // TODO: derive monotonicity
87        );
88
89        Self {
90            base,
91            core,
92            eq_join_predicate,
93            is_append_only: append_only,
94            inequality_desc,
95        }
96    }
97
98    /// Get join type
99    pub fn join_type(&self) -> JoinType {
100        self.core.join_type
101    }
102
103    /// Get a reference to the `AsOf` join's eq join predicate.
104    pub fn eq_join_predicate(&self) -> &EqJoinPredicate {
105        &self.eq_join_predicate
106    }
107
108    pub fn derive_dist_key_in_join_key(&self) -> Vec<usize> {
109        let left_dk_indices = self.left().distribution().dist_column_indices().to_vec();
110        let right_dk_indices = self.right().distribution().dist_column_indices().to_vec();
111
112        StreamJoinCommon::get_dist_key_in_join_key(
113            &left_dk_indices,
114            &right_dk_indices,
115            self.eq_join_predicate(),
116        )
117    }
118
119    /// Return stream asof join internal table catalog.
120    pub fn infer_internal_table_catalog<I: StreamPlanRef>(
121        input: I,
122        join_key_indices: Vec<usize>,
123        dk_indices_in_jk: Vec<usize>,
124        inequality_key_idx: usize,
125    ) -> (TableCatalog, Vec<usize>) {
126        let schema = input.schema();
127
128        let internal_table_dist_keys = dk_indices_in_jk
129            .iter()
130            .map(|idx| join_key_indices[*idx])
131            .collect_vec();
132
133        // The pk of AsOf join internal table should be join_key + inequality_key + input_pk.
134        let join_key_len = join_key_indices.len();
135        let mut pk_indices = join_key_indices;
136
137        // dedup the pk in dist key..
138        let mut deduped_input_pk_indices = vec![];
139        for input_pk_idx in input.stream_key().unwrap() {
140            if !pk_indices.contains(input_pk_idx)
141                && !deduped_input_pk_indices.contains(input_pk_idx)
142            {
143                deduped_input_pk_indices.push(*input_pk_idx);
144            }
145        }
146
147        pk_indices.push(inequality_key_idx);
148        pk_indices.extend(deduped_input_pk_indices.clone());
149
150        // Build internal table
151        let mut internal_table_catalog_builder = TableCatalogBuilder::default();
152        let internal_columns_fields = schema.fields().to_vec();
153
154        internal_columns_fields.iter().for_each(|field| {
155            internal_table_catalog_builder.add_column(field);
156        });
157        pk_indices.iter().for_each(|idx| {
158            internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
159        });
160
161        internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
162
163        (
164            internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
165            deduped_input_pk_indices,
166        )
167    }
168}
169
170impl Distill for StreamAsOfJoin {
171    fn distill<'a>(&self) -> XmlNode<'a> {
172        let (ljk, rjk) = self
173            .eq_join_predicate
174            .eq_indexes()
175            .first()
176            .cloned()
177            .expect("first join key");
178
179        let name = plan_node_name!("StreamAsOfJoin",
180            { "window", self.left().watermark_columns().contains(ljk) && self.right().watermark_columns().contains(rjk) },
181            { "append_only", self.is_append_only },
182        );
183        let verbose = self.base.ctx().is_explain_verbose();
184        let mut vec = Vec::with_capacity(6);
185        vec.push(("type", Pretty::debug(&self.core.join_type)));
186
187        let concat_schema = self.core.concat_schema();
188        vec.push((
189            "predicate",
190            Pretty::debug(&EqJoinPredicateDisplay {
191                eq_join_predicate: self.eq_join_predicate(),
192                input_schema: &concat_schema,
193            }),
194        ));
195
196        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
197            vec.push(("output_watermarks", ow));
198        }
199
200        if verbose {
201            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
202            vec.push(("output", data));
203        }
204
205        childless_record(name, vec)
206    }
207}
208
209impl PlanTreeNodeBinary for StreamAsOfJoin {
210    fn left(&self) -> PlanRef {
211        self.core.left.clone()
212    }
213
214    fn right(&self) -> PlanRef {
215        self.core.right.clone()
216    }
217
218    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
219        let mut core = self.core.clone();
220        core.left = left;
221        core.right = right;
222        Self::new(core, self.eq_join_predicate.clone(), self.inequality_desc)
223    }
224}
225
226impl_plan_tree_node_for_binary! { StreamAsOfJoin }
227
228impl StreamNode for StreamAsOfJoin {
229    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
230        let left_jk_indices = self.eq_join_predicate.left_eq_indexes();
231        let right_jk_indices = self.eq_join_predicate.right_eq_indexes();
232        let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
233        let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec();
234
235        let dk_indices_in_jk = self.derive_dist_key_in_join_key();
236
237        let (left_table, left_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
238            self.left().plan_base(),
239            left_jk_indices,
240            dk_indices_in_jk.clone(),
241            self.inequality_desc.left_idx as usize,
242        );
243        let (right_table, right_deduped_input_pk_indices) = Self::infer_internal_table_catalog(
244            self.right().plan_base(),
245            right_jk_indices,
246            dk_indices_in_jk,
247            self.inequality_desc.right_idx as usize,
248        );
249
250        let left_deduped_input_pk_indices = left_deduped_input_pk_indices
251            .iter()
252            .map(|idx| *idx as u32)
253            .collect_vec();
254
255        let right_deduped_input_pk_indices = right_deduped_input_pk_indices
256            .iter()
257            .map(|idx| *idx as u32)
258            .collect_vec();
259
260        let left_table = left_table.with_id(state.gen_table_id_wrapped());
261        let right_table = right_table.with_id(state.gen_table_id_wrapped());
262
263        let null_safe_prost = self.eq_join_predicate.null_safes().into_iter().collect();
264
265        let asof_join_type = match self.core.join_type {
266            JoinType::AsofInner => AsOfJoinType::Inner,
267            JoinType::AsofLeftOuter => AsOfJoinType::LeftOuter,
268            _ => unreachable!(),
269        };
270
271        NodeBody::AsOfJoin(Box::new(AsOfJoinNode {
272            join_type: asof_join_type.into(),
273            left_key: left_jk_indices_prost,
274            right_key: right_jk_indices_prost,
275            null_safe: null_safe_prost,
276            left_table: Some(left_table.to_internal_table_prost()),
277            right_table: Some(right_table.to_internal_table_prost()),
278            left_deduped_input_pk_indices,
279            right_deduped_input_pk_indices,
280            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
281            asof_desc: Some(self.inequality_desc),
282        }))
283    }
284}
285
286impl ExprRewritable for StreamAsOfJoin {
287    fn has_rewritable_expr(&self) -> bool {
288        true
289    }
290
291    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
292        let mut core = self.core.clone();
293        core.rewrite_exprs(r);
294        let eq_join_predicate = self.eq_join_predicate.rewrite_exprs(r);
295        let desc = LogicalJoin::get_inequality_desc_from_predicate(
296            eq_join_predicate.other_cond().clone(),
297            core.left.schema().len(),
298        )
299        .unwrap();
300        Self::new(core, eq_join_predicate, desc).into()
301    }
302}
303
304impl ExprVisitable for StreamAsOfJoin {
305    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
306        self.core.visit_exprs(v);
307    }
308}