risingwave_stream/executor/
asof_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::{BTreeMap, HashSet};
15use std::ops::Bound;
16use std::time::Duration;
17
18use either::Either;
19use itertools::Itertools;
20use multimap::MultiMap;
21use risingwave_common::array::Op;
22use risingwave_common::hash::{HashKey, NullBitmap};
23use risingwave_common::util::epoch::EpochPair;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use tokio::time::Instant;
26
27use self::builder::JoinChunkBuilder;
28use super::barrier_align::*;
29use super::join::hash_join::*;
30use super::join::*;
31use super::watermark::*;
32use crate::executor::join::builder::JoinStreamChunkBuilder;
33use crate::executor::prelude::*;
34
35/// Evict the cache every n rows.
36const EVICT_EVERY_N_ROWS: u32 = 16;
37
38fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
39    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
40}
41
42pub struct JoinParams {
43    /// Indices of the join keys
44    pub join_key_indices: Vec<usize>,
45    /// Indices of the input pk after dedup
46    pub deduped_pk_indices: Vec<usize>,
47}
48
49impl JoinParams {
50    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
51        Self {
52            join_key_indices,
53            deduped_pk_indices,
54        }
55    }
56}
57
58struct JoinSide<K: HashKey, S: StateStore> {
59    /// Store all data from a one side stream
60    ht: JoinHashMap<K, S>,
61    /// Indices of the join key columns
62    join_key_indices: Vec<usize>,
63    /// The data type of all columns without degree.
64    all_data_types: Vec<DataType>,
65    /// The start position for the side in output new columns
66    start_pos: usize,
67    /// The mapping from input indices of a side to output columes.
68    i2o_mapping: Vec<(usize, usize)>,
69    i2o_mapping_indexed: MultiMap<usize, usize>,
70    /// Whether degree table is needed for this side.
71    need_degree_table: bool,
72}
73
74impl<K: HashKey, S: StateStore> std::fmt::Debug for JoinSide<K, S> {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("JoinSide")
77            .field("join_key_indices", &self.join_key_indices)
78            .field("col_types", &self.all_data_types)
79            .field("start_pos", &self.start_pos)
80            .field("i2o_mapping", &self.i2o_mapping)
81            .field("need_degree_table", &self.need_degree_table)
82            .finish()
83    }
84}
85
86impl<K: HashKey, S: StateStore> JoinSide<K, S> {
87    // WARNING: Please do not call this until we implement it.
88    fn is_dirty(&self) -> bool {
89        unimplemented!()
90    }
91
92    #[expect(dead_code)]
93    fn clear_cache(&mut self) {
94        assert!(
95            !self.is_dirty(),
96            "cannot clear cache while states of hash join are dirty"
97        );
98
99        // TODO: not working with rearranged chain
100        // self.ht.clear();
101    }
102
103    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
104        self.ht.init(epoch).await
105    }
106}
107
108/// `AsOfJoinExecutor` takes two input streams and runs equal hash join on them.
109/// The output columns are the concatenation of left and right columns.
110pub struct AsOfJoinExecutor<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> {
111    ctx: ActorContextRef,
112    info: ExecutorInfo,
113
114    /// Left input executor
115    input_l: Option<Executor>,
116    /// Right input executor
117    input_r: Option<Executor>,
118    /// The data types of the formed new columns
119    actual_output_data_types: Vec<DataType>,
120    /// The parameters of the left join executor
121    side_l: JoinSide<K, S>,
122    /// The parameters of the right join executor
123    side_r: JoinSide<K, S>,
124
125    metrics: Arc<StreamingMetrics>,
126    /// The maximum size of the chunk produced by executor at a time
127    chunk_size: usize,
128    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
129    cnt_rows_received: u32,
130
131    /// watermark column index -> `BufferedWatermarks`
132    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
133
134    high_join_amplification_threshold: usize,
135    /// `AsOf` join description
136    asof_desc: AsOfDesc,
137}
138
139impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> std::fmt::Debug
140    for AsOfJoinExecutor<K, S, T>
141{
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("AsOfJoinExecutor")
144            .field("join_type", &T)
145            .field("input_left", &self.input_l.as_ref().unwrap().identity())
146            .field("input_right", &self.input_r.as_ref().unwrap().identity())
147            .field("side_l", &self.side_l)
148            .field("side_r", &self.side_r)
149            .field("pk_indices", &self.info.pk_indices)
150            .field("schema", &self.info.schema)
151            .field("actual_output_data_types", &self.actual_output_data_types)
152            .finish()
153    }
154}
155
156impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> Execute
157    for AsOfJoinExecutor<K, S, T>
158{
159    fn execute(self: Box<Self>) -> BoxedMessageStream {
160        self.into_stream().boxed()
161    }
162}
163
164struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
165    ctx: &'a ActorContextRef,
166    side_l: &'a mut JoinSide<K, S>,
167    side_r: &'a mut JoinSide<K, S>,
168    asof_desc: &'a AsOfDesc,
169    actual_output_data_types: &'a [DataType],
170    // inequality_watermarks: &'a Watermark,
171    chunk: StreamChunk,
172    chunk_size: usize,
173    cnt_rows_received: &'a mut u32,
174    high_join_amplification_threshold: usize,
175}
176
177impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor<K, S, T> {
178    #[allow(clippy::too_many_arguments)]
179    pub fn new(
180        ctx: ActorContextRef,
181        info: ExecutorInfo,
182        input_l: Executor,
183        input_r: Executor,
184        params_l: JoinParams,
185        params_r: JoinParams,
186        null_safe: Vec<bool>,
187        output_indices: Vec<usize>,
188        state_table_l: StateTable<S>,
189        state_table_r: StateTable<S>,
190        watermark_epoch: AtomicU64Ref,
191        metrics: Arc<StreamingMetrics>,
192        chunk_size: usize,
193        high_join_amplification_threshold: usize,
194        asof_desc: AsOfDesc,
195    ) -> Self {
196        let side_l_column_n = input_l.schema().len();
197
198        let schema_fields = [
199            input_l.schema().fields.clone(),
200            input_r.schema().fields.clone(),
201        ]
202        .concat();
203
204        let original_output_data_types = schema_fields
205            .iter()
206            .map(|field| field.data_type())
207            .collect_vec();
208        let actual_output_data_types = output_indices
209            .iter()
210            .map(|&idx| original_output_data_types[idx].clone())
211            .collect_vec();
212
213        // Data types of of hash join state.
214        let state_all_data_types_l = input_l.schema().data_types();
215        let state_all_data_types_r = input_r.schema().data_types();
216
217        let state_pk_indices_l = input_l.pk_indices().to_vec();
218        let state_pk_indices_r = input_r.pk_indices().to_vec();
219
220        let state_join_key_indices_l = params_l.join_key_indices;
221        let state_join_key_indices_r = params_r.join_key_indices;
222
223        // If pk is contained in join key.
224        let pk_contained_in_jk_l =
225            is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
226        let pk_contained_in_jk_r =
227            is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
228
229        let join_key_data_types_l = state_join_key_indices_l
230            .iter()
231            .map(|idx| state_all_data_types_l[*idx].clone())
232            .collect_vec();
233
234        let join_key_data_types_r = state_join_key_indices_r
235            .iter()
236            .map(|idx| state_all_data_types_r[*idx].clone())
237            .collect_vec();
238
239        assert_eq!(join_key_data_types_l, join_key_data_types_r);
240
241        let null_matched = K::Bitmap::from_bool_vec(null_safe);
242
243        let (left_to_output, right_to_output) = {
244            let (left_len, right_len) = if is_left_semi_or_anti(T) {
245                (state_all_data_types_l.len(), 0usize)
246            } else if is_right_semi_or_anti(T) {
247                (0usize, state_all_data_types_r.len())
248            } else {
249                (state_all_data_types_l.len(), state_all_data_types_r.len())
250            };
251            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
252        };
253
254        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
255        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
256
257        // handle inequality watermarks
258        // https://github.com/risingwavelabs/risingwave/issues/18503
259        // let inequality_watermarks = None;
260        let watermark_buffers = BTreeMap::new();
261
262        let inequal_key_idx_l = Some(asof_desc.left_idx);
263        let inequal_key_idx_r = Some(asof_desc.right_idx);
264
265        Self {
266            ctx: ctx.clone(),
267            info,
268            input_l: Some(input_l),
269            input_r: Some(input_r),
270            actual_output_data_types,
271            side_l: JoinSide {
272                ht: JoinHashMap::new(
273                    watermark_epoch.clone(),
274                    join_key_data_types_l,
275                    state_join_key_indices_l.clone(),
276                    state_all_data_types_l.clone(),
277                    state_table_l,
278                    params_l.deduped_pk_indices,
279                    None,
280                    null_matched.clone(),
281                    pk_contained_in_jk_l,
282                    inequal_key_idx_l,
283                    metrics.clone(),
284                    ctx.id,
285                    ctx.fragment_id,
286                    "left",
287                ),
288                join_key_indices: state_join_key_indices_l,
289                all_data_types: state_all_data_types_l,
290                i2o_mapping: left_to_output,
291                i2o_mapping_indexed: l2o_indexed,
292                start_pos: 0,
293                need_degree_table: false,
294            },
295            side_r: JoinSide {
296                ht: JoinHashMap::new(
297                    watermark_epoch,
298                    join_key_data_types_r,
299                    state_join_key_indices_r.clone(),
300                    state_all_data_types_r.clone(),
301                    state_table_r,
302                    params_r.deduped_pk_indices,
303                    None,
304                    null_matched,
305                    pk_contained_in_jk_r,
306                    inequal_key_idx_r,
307                    metrics.clone(),
308                    ctx.id,
309                    ctx.fragment_id,
310                    "right",
311                ),
312                join_key_indices: state_join_key_indices_r,
313                all_data_types: state_all_data_types_r,
314                start_pos: side_l_column_n,
315                i2o_mapping: right_to_output,
316                i2o_mapping_indexed: r2o_indexed,
317                need_degree_table: false,
318            },
319            metrics,
320            chunk_size,
321            cnt_rows_received: 0,
322            watermark_buffers,
323            high_join_amplification_threshold,
324            asof_desc,
325        }
326    }
327
328    #[try_stream(ok = Message, error = StreamExecutorError)]
329    async fn into_stream(mut self) {
330        let input_l = self.input_l.take().unwrap();
331        let input_r = self.input_r.take().unwrap();
332        let aligned_stream = barrier_align(
333            input_l.execute(),
334            input_r.execute(),
335            self.ctx.id,
336            self.ctx.fragment_id,
337            self.metrics.clone(),
338            "Join",
339        );
340        pin_mut!(aligned_stream);
341        let actor_id = self.ctx.id;
342
343        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
344        let first_epoch = barrier.epoch;
345        // The first barrier message should be propagated.
346        yield Message::Barrier(barrier);
347        self.side_l.init(first_epoch).await?;
348        self.side_r.init(first_epoch).await?;
349
350        let actor_id_str = self.ctx.id.to_string();
351        let fragment_id_str = self.ctx.fragment_id.to_string();
352
353        // initialized some metrics
354        let join_actor_input_waiting_duration_ns = self
355            .metrics
356            .join_actor_input_waiting_duration_ns
357            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
358        let left_join_match_duration_ns = self
359            .metrics
360            .join_match_duration_ns
361            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
362        let right_join_match_duration_ns = self
363            .metrics
364            .join_match_duration_ns
365            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
366
367        let barrier_join_match_duration_ns = self
368            .metrics
369            .join_match_duration_ns
370            .with_guarded_label_values(&[
371                actor_id_str.as_str(),
372                fragment_id_str.as_str(),
373                "barrier",
374            ]);
375
376        let left_join_cached_entry_count = self
377            .metrics
378            .join_cached_entry_count
379            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
380
381        let right_join_cached_entry_count = self
382            .metrics
383            .join_cached_entry_count
384            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
385
386        let mut start_time = Instant::now();
387
388        while let Some(msg) = aligned_stream
389            .next()
390            .instrument_await("hash_join_barrier_align")
391            .await
392        {
393            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
394            match msg? {
395                AlignedMessage::WatermarkLeft(watermark) => {
396                    for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
397                        yield Message::Watermark(watermark_to_emit);
398                    }
399                }
400                AlignedMessage::WatermarkRight(watermark) => {
401                    for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
402                        yield Message::Watermark(watermark_to_emit);
403                    }
404                }
405                AlignedMessage::Left(chunk) => {
406                    let mut left_time = Duration::from_nanos(0);
407                    let mut left_start_time = Instant::now();
408                    #[for_await]
409                    for chunk in Self::eq_join_left(EqJoinArgs {
410                        ctx: &self.ctx,
411                        side_l: &mut self.side_l,
412                        side_r: &mut self.side_r,
413                        asof_desc: &self.asof_desc,
414                        actual_output_data_types: &self.actual_output_data_types,
415                        // inequality_watermarks: &self.inequality_watermarks,
416                        chunk,
417                        chunk_size: self.chunk_size,
418                        cnt_rows_received: &mut self.cnt_rows_received,
419                        high_join_amplification_threshold: self.high_join_amplification_threshold,
420                    }) {
421                        left_time += left_start_time.elapsed();
422                        yield Message::Chunk(chunk?);
423                        left_start_time = Instant::now();
424                    }
425                    left_time += left_start_time.elapsed();
426                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
427                    self.try_flush_data().await?;
428                }
429                AlignedMessage::Right(chunk) => {
430                    let mut right_time = Duration::from_nanos(0);
431                    let mut right_start_time = Instant::now();
432                    #[for_await]
433                    for chunk in Self::eq_join_right(EqJoinArgs {
434                        ctx: &self.ctx,
435                        side_l: &mut self.side_l,
436                        side_r: &mut self.side_r,
437                        asof_desc: &self.asof_desc,
438                        actual_output_data_types: &self.actual_output_data_types,
439                        // inequality_watermarks: &self.inequality_watermarks,
440                        chunk,
441                        chunk_size: self.chunk_size,
442                        cnt_rows_received: &mut self.cnt_rows_received,
443                        high_join_amplification_threshold: self.high_join_amplification_threshold,
444                    }) {
445                        right_time += right_start_time.elapsed();
446                        yield Message::Chunk(chunk?);
447                        right_start_time = Instant::now();
448                    }
449                    right_time += right_start_time.elapsed();
450                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
451                    self.try_flush_data().await?;
452                }
453                AlignedMessage::Barrier(barrier) => {
454                    let barrier_start_time = Instant::now();
455                    let (left_post_commit, right_post_commit) =
456                        self.flush_data(barrier.epoch).await?;
457
458                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
459                    yield Message::Barrier(barrier);
460
461                    // Update the vnode bitmap for state tables of both sides if asked.
462                    right_post_commit
463                        .post_yield_barrier(update_vnode_bitmap.clone())
464                        .await?;
465                    if left_post_commit
466                        .post_yield_barrier(update_vnode_bitmap)
467                        .await?
468                        .unwrap_or(false)
469                    {
470                        self.watermark_buffers
471                            .values_mut()
472                            .for_each(|buffers| buffers.clear());
473                    }
474
475                    // Report metrics of cached join rows/entries
476                    for (join_cached_entry_count, ht) in [
477                        (&left_join_cached_entry_count, &self.side_l.ht),
478                        (&right_join_cached_entry_count, &self.side_r.ht),
479                    ] {
480                        join_cached_entry_count.set(ht.entry_count() as i64);
481                    }
482
483                    barrier_join_match_duration_ns
484                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
485                }
486            }
487            start_time = Instant::now();
488        }
489    }
490
491    async fn flush_data(
492        &mut self,
493        epoch: EpochPair,
494    ) -> StreamExecutorResult<(
495        JoinHashMapPostCommit<'_, K, S>,
496        JoinHashMapPostCommit<'_, K, S>,
497    )> {
498        // All changes to the state has been buffered in the mem-table of the state table. Just
499        // `commit` them here.
500        let left = self.side_l.ht.flush(epoch).await?;
501        let right = self.side_r.ht.flush(epoch).await?;
502        Ok((left, right))
503    }
504
505    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
506        // All changes to the state has been buffered in the mem-table of the state table. Just
507        // `commit` them here.
508        self.side_l.ht.try_flush().await?;
509        self.side_r.ht.try_flush().await?;
510        Ok(())
511    }
512
513    // We need to manually evict the cache.
514    fn evict_cache(
515        side_update: &mut JoinSide<K, S>,
516        side_match: &mut JoinSide<K, S>,
517        cnt_rows_received: &mut u32,
518    ) {
519        *cnt_rows_received += 1;
520        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
521            side_update.ht.evict();
522            side_match.ht.evict();
523            *cnt_rows_received = 0;
524        }
525    }
526
527    fn handle_watermark(
528        &mut self,
529        side: SideTypePrimitive,
530        watermark: Watermark,
531    ) -> StreamExecutorResult<Vec<Watermark>> {
532        let (side_update, side_match) = if side == SideType::Left {
533            (&mut self.side_l, &mut self.side_r)
534        } else {
535            (&mut self.side_r, &mut self.side_l)
536        };
537
538        // State cleaning
539        if side_update.join_key_indices[0] == watermark.col_idx {
540            side_match.ht.update_watermark(watermark.val.clone());
541        }
542
543        // Select watermarks to yield.
544        let wm_in_jk = side_update
545            .join_key_indices
546            .iter()
547            .positions(|idx| *idx == watermark.col_idx);
548        let mut watermarks_to_emit = vec![];
549        for idx in wm_in_jk {
550            let buffers = self
551                .watermark_buffers
552                .entry(idx)
553                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
554            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
555                let empty_indices = vec![];
556                let output_indices = side_update
557                    .i2o_mapping_indexed
558                    .get_vec(&side_update.join_key_indices[idx])
559                    .unwrap_or(&empty_indices)
560                    .iter()
561                    .chain(
562                        side_match
563                            .i2o_mapping_indexed
564                            .get_vec(&side_match.join_key_indices[idx])
565                            .unwrap_or(&empty_indices),
566                    );
567                for output_idx in output_indices {
568                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
569                }
570            };
571        }
572        Ok(watermarks_to_emit)
573    }
574
575    /// the data the hash table and match the coming
576    /// data chunk with the executor state
577    async fn hash_eq_match(
578        key: &K,
579        ht: &mut JoinHashMap<K, S>,
580    ) -> StreamExecutorResult<Option<HashValueType>> {
581        if !key.null_bitmap().is_subset(ht.null_matched()) {
582            Ok(None)
583        } else {
584            ht.take_state(key).await.map(Some)
585        }
586    }
587
588    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
589    async fn eq_join_left(args: EqJoinArgs<'_, K, S>) {
590        let EqJoinArgs {
591            ctx: _,
592            side_l,
593            side_r,
594            asof_desc,
595            actual_output_data_types,
596            // inequality_watermarks,
597            chunk,
598            chunk_size,
599            cnt_rows_received,
600            high_join_amplification_threshold: _,
601        } = args;
602
603        let (side_update, side_match) = (side_l, side_r);
604
605        let mut join_chunk_builder =
606            JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
607                chunk_size,
608                actual_output_data_types.to_vec(),
609                side_update.i2o_mapping.clone(),
610                side_match.i2o_mapping.clone(),
611            ));
612
613        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
614        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
615            let Some((op, row)) = r else {
616                continue;
617            };
618            Self::evict_cache(side_update, side_match, cnt_rows_received);
619
620            let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
621                Self::hash_eq_match(key, &mut side_match.ht).await?
622            } else {
623                None
624            };
625            let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
626
627            if let Some(matched_rows) = matched_rows {
628                let matched_row_by_inequality = match asof_desc.inequality_type {
629                    AsOfInequalityType::Lt => matched_rows.lower_bound_by_inequality(
630                        Bound::Excluded(&inequal_key),
631                        &side_match.all_data_types,
632                    ),
633                    AsOfInequalityType::Le => matched_rows.lower_bound_by_inequality(
634                        Bound::Included(&inequal_key),
635                        &side_match.all_data_types,
636                    ),
637                    AsOfInequalityType::Gt => matched_rows.upper_bound_by_inequality(
638                        Bound::Excluded(&inequal_key),
639                        &side_match.all_data_types,
640                    ),
641                    AsOfInequalityType::Ge => matched_rows.upper_bound_by_inequality(
642                        Bound::Included(&inequal_key),
643                        &side_match.all_data_types,
644                    ),
645                };
646                match op {
647                    Op::Insert | Op::UpdateInsert => {
648                        if let Some(matched_row_by_inequality) = matched_row_by_inequality {
649                            let matched_row = matched_row_by_inequality?;
650
651                            if let Some(chunk) =
652                                join_chunk_builder.with_match_on_insert(&row, &matched_row)
653                            {
654                                yield chunk;
655                            }
656                        } else if let Some(chunk) =
657                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
658                        {
659                            yield chunk;
660                        }
661                        side_update.ht.insert_row(key, row)?;
662                    }
663                    Op::Delete | Op::UpdateDelete => {
664                        if let Some(matched_row_by_inequality) = matched_row_by_inequality {
665                            let matched_row = matched_row_by_inequality?;
666
667                            if let Some(chunk) =
668                                join_chunk_builder.with_match_on_delete(&row, &matched_row)
669                            {
670                                yield chunk;
671                            }
672                        } else if let Some(chunk) =
673                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
674                        {
675                            yield chunk;
676                        }
677                        side_update.ht.delete_row(key, row)?;
678                    }
679                }
680                // Insert back the state taken from ht.
681                side_match.ht.update_state(key, matched_rows);
682            } else {
683                // Row which violates null-safe bitmap will never be matched so we need not
684                // store.
685                match op {
686                    Op::Insert | Op::UpdateInsert => {
687                        if let Some(chunk) =
688                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
689                        {
690                            yield chunk;
691                        }
692                    }
693                    Op::Delete | Op::UpdateDelete => {
694                        if let Some(chunk) =
695                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
696                        {
697                            yield chunk;
698                        }
699                    }
700                }
701            }
702        }
703        if let Some(chunk) = join_chunk_builder.take() {
704            yield chunk;
705        }
706    }
707
708    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
709    async fn eq_join_right(args: EqJoinArgs<'_, K, S>) {
710        let EqJoinArgs {
711            ctx,
712            side_l,
713            side_r,
714            asof_desc,
715            actual_output_data_types,
716            // inequality_watermarks,
717            chunk,
718            chunk_size,
719            cnt_rows_received,
720            high_join_amplification_threshold,
721        } = args;
722
723        let (side_update, side_match) = (side_r, side_l);
724
725        let mut join_chunk_builder = JoinStreamChunkBuilder::new(
726            chunk_size,
727            actual_output_data_types.to_vec(),
728            side_update.i2o_mapping.clone(),
729            side_match.i2o_mapping.clone(),
730        );
731
732        let join_matched_rows_metrics = ctx
733            .streaming_metrics
734            .join_matched_join_keys
735            .with_guarded_label_values(&[
736                &ctx.id.to_string(),
737                &ctx.fragment_id.to_string(),
738                &side_update.ht.table_id().to_string(),
739            ]);
740
741        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
742        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
743            let Some((op, row)) = r else {
744                continue;
745            };
746            let mut join_matched_rows_cnt = 0;
747
748            Self::evict_cache(side_update, side_match, cnt_rows_received);
749
750            let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
751                Self::hash_eq_match(key, &mut side_match.ht).await?
752            } else {
753                None
754            };
755            let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
756
757            if let Some(matched_rows) = matched_rows {
758                let update_rows = Self::hash_eq_match(key, &mut side_update.ht).await?.expect("None is not expected because we have checked null in key when getting matched_rows");
759                let right_inequality_index = update_rows.inequality_index();
760                let (row_to_delete_r, row_to_insert_r) =
761                    if let Some(pks) = right_inequality_index.get(&inequal_key) {
762                        assert!(!pks.is_empty());
763                        let row_pk = side_update.ht.serialize_pk_from_row(row);
764                        match op {
765                            Op::Insert | Op::UpdateInsert => {
766                                // If there are multiple rows match the inequality key in the right table, we use one with smallest pk.
767                                let smallest_pk = pks.first_key_sorted().unwrap();
768                                if smallest_pk > &row_pk {
769                                    // smallest_pk is in the cache index, so it must exist in the cache.
770                                    if let Some(to_delete_row) = update_rows
771                                        .get_by_indexed_pk(smallest_pk, &side_update.all_data_types)
772                                    {
773                                        (
774                                            Some(Either::Left(to_delete_row?.row)),
775                                            Some(Either::Right(row)),
776                                        )
777                                    } else {
778                                        // Something wrong happened. Ignore this row in non strict consistency mode.
779                                        (None, None)
780                                    }
781                                } else {
782                                    // No affected row in the right table.
783                                    (None, None)
784                                }
785                            }
786                            Op::Delete | Op::UpdateDelete => {
787                                let smallest_pk = pks.first_key_sorted().unwrap();
788                                if smallest_pk == &row_pk {
789                                    if let Some(second_smallest_pk) = pks.second_key_sorted() {
790                                        if let Some(to_insert_row) = update_rows.get_by_indexed_pk(
791                                            second_smallest_pk,
792                                            &side_update.all_data_types,
793                                        ) {
794                                            (
795                                                Some(Either::Right(row)),
796                                                Some(Either::Left(to_insert_row?.row)),
797                                            )
798                                        } else {
799                                            // Something wrong happened. Ignore this row in non strict consistency mode.
800                                            (None, None)
801                                        }
802                                    } else {
803                                        (Some(Either::Right(row)), None)
804                                    }
805                                } else {
806                                    // No affected row in the right table.
807                                    (None, None)
808                                }
809                            }
810                        }
811                    } else {
812                        match op {
813                            // Decide the row_to_delete later
814                            Op::Insert | Op::UpdateInsert => (None, Some(Either::Right(row))),
815                            // Decide the row_to_insert later
816                            Op::Delete | Op::UpdateDelete => (Some(Either::Right(row)), None),
817                        }
818                    };
819
820                // 4 cases for row_to_delete_r and row_to_insert_r:
821                // 1. Some(_), Some(_): delete row_to_delete_r and insert row_to_insert_r
822                // 2. None, Some(_)   : row_to_delete to be decided by the nearest inequality key
823                // 3. Some(_), None   : row_to_insert to be decided by the nearest inequality key
824                // 4. None, None      : do nothing
825                if row_to_delete_r.is_none() && row_to_insert_r.is_none() {
826                    // no row to delete or insert.
827                } else {
828                    let prev_inequality_key =
829                        right_inequality_index.upper_bound_key(Bound::Excluded(&inequal_key));
830                    let next_inequality_key =
831                        right_inequality_index.lower_bound_key(Bound::Excluded(&inequal_key));
832                    let affected_row_r = match asof_desc.inequality_type {
833                        AsOfInequalityType::Lt | AsOfInequalityType::Le => next_inequality_key
834                            .and_then(|k| {
835                                update_rows.get_first_by_inequality(k, &side_update.all_data_types)
836                            }),
837                        AsOfInequalityType::Gt | AsOfInequalityType::Ge => prev_inequality_key
838                            .and_then(|k| {
839                                update_rows.get_first_by_inequality(k, &side_update.all_data_types)
840                            }),
841                    }
842                    .transpose()?
843                    .map(|r| Either::Left(r.row));
844
845                    let (row_to_delete_r, row_to_insert_r) =
846                        match (&row_to_delete_r, &row_to_insert_r) {
847                            (Some(_), Some(_)) => (row_to_delete_r, row_to_insert_r),
848                            (None, Some(_)) => (affected_row_r, row_to_insert_r),
849                            (Some(_), None) => (row_to_delete_r, affected_row_r),
850                            (None, None) => unreachable!(),
851                        };
852                    let range = match asof_desc.inequality_type {
853                        AsOfInequalityType::Lt => (
854                            prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
855                            Bound::Excluded(&inequal_key),
856                        ),
857                        AsOfInequalityType::Le => (
858                            prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
859                            Bound::Included(&inequal_key),
860                        ),
861                        AsOfInequalityType::Gt => (
862                            Bound::Excluded(&inequal_key),
863                            next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
864                        ),
865                        AsOfInequalityType::Ge => (
866                            Bound::Included(&inequal_key),
867                            next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
868                        ),
869                    };
870
871                    let rows_l =
872                        matched_rows.range_by_inequality(range, &side_match.all_data_types);
873                    for row_l in rows_l {
874                        join_matched_rows_cnt += 1;
875                        let row_l = row_l?.row;
876                        if let Some(row_to_delete_r) = &row_to_delete_r {
877                            if let Some(chunk) =
878                                join_chunk_builder.append_row(Op::Delete, row_to_delete_r, &row_l)
879                            {
880                                yield chunk;
881                            }
882                        } else if is_as_of_left_outer(T) {
883                            if let Some(chunk) =
884                                join_chunk_builder.append_row_matched(Op::Delete, &row_l)
885                            {
886                                yield chunk;
887                            }
888                        }
889                        if let Some(row_to_insert_r) = &row_to_insert_r {
890                            if let Some(chunk) =
891                                join_chunk_builder.append_row(Op::Insert, row_to_insert_r, &row_l)
892                            {
893                                yield chunk;
894                            }
895                        } else if is_as_of_left_outer(T) {
896                            if let Some(chunk) =
897                                join_chunk_builder.append_row_matched(Op::Insert, &row_l)
898                            {
899                                yield chunk;
900                            }
901                        }
902                    }
903                }
904                // Insert back the state taken from ht.
905                side_match.ht.update_state(key, matched_rows);
906                side_update.ht.update_state(key, update_rows);
907
908                match op {
909                    Op::Insert | Op::UpdateInsert => {
910                        side_update.ht.insert_row(key, row)?;
911                    }
912                    Op::Delete | Op::UpdateDelete => {
913                        side_update.ht.delete_row(key, row)?;
914                    }
915                }
916            } else {
917                // Row which violates null-safe bitmap will never be matched so we need not
918                // store.
919                // Noop here because we only support left outer AsOf join.
920            }
921            join_matched_rows_metrics.observe(join_matched_rows_cnt as _);
922            if join_matched_rows_cnt > high_join_amplification_threshold {
923                let join_key_data_types = side_update.ht.join_key_data_types();
924                let key = key.deserialize(join_key_data_types)?;
925                tracing::warn!(target: "high_join_amplification",
926                    matched_rows_len = join_matched_rows_cnt,
927                    update_table_id = side_update.ht.table_id(),
928                    match_table_id = side_match.ht.table_id(),
929                    join_key = ?key,
930                    actor_id = ctx.id,
931                    fragment_id = ctx.fragment_id,
932                    "large rows matched for join key when AsOf join updating right side",
933                );
934            }
935        }
936        if let Some(chunk) = join_chunk_builder.take() {
937            yield chunk;
938        }
939    }
940}
941
942#[cfg(test)]
943mod tests {
944    use std::sync::atomic::AtomicU64;
945
946    use risingwave_common::array::*;
947    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
948    use risingwave_common::hash::Key64;
949    use risingwave_common::util::epoch::test_epoch;
950    use risingwave_common::util::sort_util::OrderType;
951    use risingwave_storage::memory::MemoryStateStore;
952
953    use super::*;
954    use crate::common::table::test_utils::gen_pbtable;
955    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
956
957    async fn create_in_memory_state_table(
958        mem_state: MemoryStateStore,
959        data_types: &[DataType],
960        order_types: &[OrderType],
961        pk_indices: &[usize],
962        table_id: u32,
963    ) -> StateTable<MemoryStateStore> {
964        let column_descs = data_types
965            .iter()
966            .enumerate()
967            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
968            .collect_vec();
969        StateTable::from_table_catalog(
970            &gen_pbtable(
971                TableId::new(table_id),
972                column_descs,
973                order_types.to_vec(),
974                pk_indices.to_vec(),
975                0,
976            ),
977            mem_state.clone(),
978            None,
979        )
980        .await
981    }
982
983    async fn create_executor<const T: AsOfJoinTypePrimitive>(
984        asof_desc: AsOfDesc,
985    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
986        let schema = Schema {
987            fields: vec![
988                Field::unnamed(DataType::Int64), // join key
989                Field::unnamed(DataType::Int64),
990                Field::unnamed(DataType::Int64),
991            ],
992        };
993        let (tx_l, source_l) = MockSource::channel();
994        let source_l = source_l.into_executor(schema.clone(), vec![1]);
995        let (tx_r, source_r) = MockSource::channel();
996        let source_r = source_r.into_executor(schema, vec![1]);
997        let params_l = JoinParams::new(vec![0], vec![1]);
998        let params_r = JoinParams::new(vec![0], vec![1]);
999
1000        let mem_state = MemoryStateStore::new();
1001
1002        let state_l = create_in_memory_state_table(
1003            mem_state.clone(),
1004            &[DataType::Int64, DataType::Int64, DataType::Int64],
1005            &[
1006                OrderType::ascending(),
1007                OrderType::ascending(),
1008                OrderType::ascending(),
1009            ],
1010            &[0, asof_desc.left_idx, 1],
1011            0,
1012        )
1013        .await;
1014
1015        let state_r = create_in_memory_state_table(
1016            mem_state,
1017            &[DataType::Int64, DataType::Int64, DataType::Int64],
1018            &[
1019                OrderType::ascending(),
1020                OrderType::ascending(),
1021                OrderType::ascending(),
1022            ],
1023            &[0, asof_desc.right_idx, 1],
1024            1,
1025        )
1026        .await;
1027
1028        let schema: Schema = [source_l.schema().fields(), source_r.schema().fields()]
1029            .concat()
1030            .into_iter()
1031            .collect();
1032        let schema_len = schema.len();
1033        let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1034
1035        let executor = AsOfJoinExecutor::<Key64, MemoryStateStore, T>::new(
1036            ActorContext::for_test(123),
1037            info,
1038            source_l,
1039            source_r,
1040            params_l,
1041            params_r,
1042            vec![false],
1043            (0..schema_len).collect_vec(),
1044            state_l,
1045            state_r,
1046            Arc::new(AtomicU64::new(0)),
1047            Arc::new(StreamingMetrics::unused()),
1048            1024,
1049            2048,
1050            asof_desc,
1051        );
1052        (tx_l, tx_r, executor.boxed().execute())
1053    }
1054
1055    #[tokio::test]
1056    async fn test_as_of_inner_join() -> StreamExecutorResult<()> {
1057        let asof_desc = AsOfDesc {
1058            left_idx: 0,
1059            right_idx: 2,
1060            inequality_type: AsOfInequalityType::Lt,
1061        };
1062
1063        let chunk_l1 = StreamChunk::from_pretty(
1064            "  I I I
1065             + 1 4 7
1066             + 2 5 8
1067             + 3 6 9",
1068        );
1069        let chunk_l2 = StreamChunk::from_pretty(
1070            "  I I I
1071             + 3 8 1
1072             - 3 8 1",
1073        );
1074        let chunk_r1 = StreamChunk::from_pretty(
1075            "  I I I
1076             + 2 1 7
1077             + 2 2 1
1078             + 2 3 4
1079             + 2 4 2
1080             + 6 1 9
1081             + 6 2 9",
1082        );
1083        let chunk_r2 = StreamChunk::from_pretty(
1084            "  I I I
1085             - 2 3 4",
1086        );
1087        let chunk_r3 = StreamChunk::from_pretty(
1088            "  I I I
1089             + 2 3 3",
1090        );
1091        let chunk_l3 = StreamChunk::from_pretty(
1092            "  I I I
1093             - 2 5 8",
1094        );
1095        let chunk_l4 = StreamChunk::from_pretty(
1096            "  I I I
1097             + 6 3 1
1098             + 6 4 1",
1099        );
1100        let chunk_r4 = StreamChunk::from_pretty(
1101            "  I I I
1102             - 6 1 9",
1103        );
1104
1105        let (mut tx_l, mut tx_r, mut hash_join) =
1106            create_executor::<{ AsOfJoinType::Inner }>(asof_desc).await;
1107
1108        // push the init barrier for left and right
1109        tx_l.push_barrier(test_epoch(1), false);
1110        tx_r.push_barrier(test_epoch(1), false);
1111        hash_join.next_unwrap_ready_barrier()?;
1112
1113        // push the 1st left chunk
1114        tx_l.push_chunk(chunk_l1);
1115        hash_join.next_unwrap_pending();
1116
1117        // push the init barrier for left and right
1118        tx_l.push_barrier(test_epoch(2), false);
1119        tx_r.push_barrier(test_epoch(2), false);
1120        hash_join.next_unwrap_ready_barrier()?;
1121
1122        // push the 2nd left chunk
1123        tx_l.push_chunk(chunk_l2);
1124        hash_join.next_unwrap_pending();
1125
1126        // push the 1st right chunk
1127        tx_r.push_chunk(chunk_r1);
1128        let chunk = hash_join.next_unwrap_ready_chunk()?;
1129        assert_eq!(
1130            chunk,
1131            StreamChunk::from_pretty(
1132                " I I I I I I
1133                + 2 5 8 2 1 7
1134                - 2 5 8 2 1 7
1135                + 2 5 8 2 3 4"
1136            )
1137        );
1138
1139        // push the 2nd right chunk
1140        tx_r.push_chunk(chunk_r2);
1141        let chunk = hash_join.next_unwrap_ready_chunk()?;
1142        assert_eq!(
1143            chunk,
1144            StreamChunk::from_pretty(
1145                " I I I I I I
1146                - 2 5 8 2 3 4
1147                + 2 5 8 2 1 7"
1148            )
1149        );
1150
1151        // push the 3rd right chunk
1152        tx_r.push_chunk(chunk_r3);
1153        let chunk = hash_join.next_unwrap_ready_chunk()?;
1154        assert_eq!(
1155            chunk,
1156            StreamChunk::from_pretty(
1157                " I I I I I I
1158                - 2 5 8 2 1 7
1159                + 2 5 8 2 3 3"
1160            )
1161        );
1162
1163        // push the 3rd left chunk
1164        tx_l.push_chunk(chunk_l3);
1165        let chunk = hash_join.next_unwrap_ready_chunk()?;
1166        assert_eq!(
1167            chunk,
1168            StreamChunk::from_pretty(
1169                " I I I I I I
1170                - 2 5 8 2 3 3"
1171            )
1172        );
1173
1174        // push the 4th left chunk
1175        tx_l.push_chunk(chunk_l4);
1176        let chunk = hash_join.next_unwrap_ready_chunk()?;
1177        assert_eq!(
1178            chunk,
1179            StreamChunk::from_pretty(
1180                " I I I I I I
1181                + 6 3 1 6 1 9
1182                + 6 4 1 6 1 9"
1183            )
1184        );
1185
1186        // push the 4th right chunk
1187        tx_r.push_chunk(chunk_r4);
1188        let chunk = hash_join.next_unwrap_ready_chunk()?;
1189        assert_eq!(
1190            chunk,
1191            StreamChunk::from_pretty(
1192                " I I I I I I
1193                - 6 3 1 6 1 9
1194                + 6 3 1 6 2 9
1195                - 6 4 1 6 1 9
1196                + 6 4 1 6 2 9"
1197            )
1198        );
1199
1200        Ok(())
1201    }
1202
1203    #[tokio::test]
1204    async fn test_as_of_left_outer_join() -> StreamExecutorResult<()> {
1205        let asof_desc = AsOfDesc {
1206            left_idx: 1,
1207            right_idx: 2,
1208            inequality_type: AsOfInequalityType::Ge,
1209        };
1210
1211        let chunk_l1 = StreamChunk::from_pretty(
1212            "  I I I
1213             + 1 4 7
1214             + 2 5 8
1215             + 3 6 9",
1216        );
1217        let chunk_l2 = StreamChunk::from_pretty(
1218            "  I I I
1219             + 3 8 1
1220             - 3 8 1",
1221        );
1222        let chunk_r1 = StreamChunk::from_pretty(
1223            "  I I I
1224             + 2 3 4
1225             + 2 2 5
1226             + 2 1 5
1227             + 6 1 8
1228             + 6 2 9",
1229        );
1230        let chunk_r2 = StreamChunk::from_pretty(
1231            "  I I I
1232             - 2 3 4
1233             - 2 1 5
1234             - 2 2 5",
1235        );
1236        let chunk_l3 = StreamChunk::from_pretty(
1237            "  I I I
1238             + 6 8 9",
1239        );
1240        let chunk_r3 = StreamChunk::from_pretty(
1241            "  I I I
1242             - 6 1 8",
1243        );
1244
1245        let (mut tx_l, mut tx_r, mut hash_join) =
1246            create_executor::<{ AsOfJoinType::LeftOuter }>(asof_desc).await;
1247
1248        // push the init barrier for left and right
1249        tx_l.push_barrier(test_epoch(1), false);
1250        tx_r.push_barrier(test_epoch(1), false);
1251        hash_join.next_unwrap_ready_barrier()?;
1252
1253        // push the 1st left chunk
1254        tx_l.push_chunk(chunk_l1);
1255        let chunk = hash_join.next_unwrap_ready_chunk()?;
1256        assert_eq!(
1257            chunk,
1258            StreamChunk::from_pretty(
1259                " I I I I I I
1260                + 1 4 7 . . .
1261                + 2 5 8 . . .
1262                + 3 6 9 . . ."
1263            )
1264        );
1265
1266        // push the init barrier for left and right
1267        tx_l.push_barrier(test_epoch(2), false);
1268        tx_r.push_barrier(test_epoch(2), false);
1269        hash_join.next_unwrap_ready_barrier()?;
1270
1271        // push the 2nd left chunk
1272        tx_l.push_chunk(chunk_l2);
1273        let chunk = hash_join.next_unwrap_ready_chunk()?;
1274        assert_eq!(
1275            chunk,
1276            StreamChunk::from_pretty(
1277                " I I I I I I
1278                + 3 8 1 . . .
1279                - 3 8 1 . . ."
1280            )
1281        );
1282
1283        // push the 1st right chunk
1284        tx_r.push_chunk(chunk_r1);
1285        let chunk = hash_join.next_unwrap_ready_chunk()?;
1286        assert_eq!(
1287            chunk,
1288            StreamChunk::from_pretty(
1289                " I I I I I I
1290                - 2 5 8 . . .
1291                + 2 5 8 2 3 4
1292                - 2 5 8 2 3 4
1293                + 2 5 8 2 2 5
1294                - 2 5 8 2 2 5
1295                + 2 5 8 2 1 5"
1296            )
1297        );
1298
1299        // push the 2nd right chunk
1300        tx_r.push_chunk(chunk_r2);
1301        let chunk = hash_join.next_unwrap_ready_chunk()?;
1302        assert_eq!(
1303            chunk,
1304            StreamChunk::from_pretty(
1305                " I I I I I I
1306                - 2 5 8 2 1 5
1307                + 2 5 8 2 2 5
1308                - 2 5 8 2 2 5
1309                + 2 5 8 . . ."
1310            )
1311        );
1312
1313        // push the 3rd left chunk
1314        tx_l.push_chunk(chunk_l3);
1315        let chunk = hash_join.next_unwrap_ready_chunk()?;
1316        assert_eq!(
1317            chunk,
1318            StreamChunk::from_pretty(
1319                " I I I I I I
1320                + 6 8 9 6 1 8"
1321            )
1322        );
1323
1324        // push the 3rd right chunk
1325        tx_r.push_chunk(chunk_r3);
1326        let chunk = hash_join.next_unwrap_ready_chunk()?;
1327        assert_eq!(
1328            chunk,
1329            StreamChunk::from_pretty(
1330                " I I I I I I
1331                - 6 8 9 6 1 8
1332                + 6 8 9 . . ."
1333            )
1334        );
1335        Ok(())
1336    }
1337}