risingwave_stream/executor/lookup/
sides.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::pin;
16
17use anyhow::Context;
18use either::Either;
19use futures::future::select;
20use futures::{StreamExt, future};
21use futures_async_stream::try_stream;
22use risingwave_common::array::StreamChunk;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnDesc;
25use risingwave_common::types::DataType;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_storage::StateStore;
28use risingwave_storage::table::batch_table::BatchTable;
29
30use crate::executor::error::StreamExecutorError;
31use crate::executor::{Barrier, BoxedMessageStream, Executor, Message, MessageStream};
32
33/// Join side of Lookup Executor's stream
34pub(crate) struct StreamJoinSide {
35    /// Indices of the join key columns
36    pub key_indices: Vec<usize>,
37
38    /// The primary key indices of this side, used for state store
39    pub pk_indices: Vec<usize>,
40
41    /// The date type of each columns to join on
42    pub col_types: Vec<DataType>,
43}
44
45impl std::fmt::Debug for StreamJoinSide {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("ArrangeJoinSide")
48            .field("key_indices", &self.key_indices)
49            .field("pk_indices", &self.pk_indices)
50            .field("col_types", &self.col_types)
51            .finish()
52    }
53}
54
55/// Join side of Arrange Executor's stream
56pub(crate) struct ArrangeJoinSide<S: StateStore> {
57    /// The primary key indices of this side, used for state store
58    pub pk_indices: Vec<usize>,
59
60    /// The datatype of columns in arrangement
61    pub col_types: Vec<DataType>,
62
63    /// The column descriptors of columns in arrangement
64    pub col_descs: Vec<ColumnDesc>,
65
66    /// Order rules of the arrangement (only join key is needed, pk should not be included, used
67    /// for lookup)
68    pub order_rules: Vec<ColumnOrder>,
69
70    /// Key indices for the join
71    ///
72    /// The key indices of the arrange side won't be used for the lookup process, but we still
73    /// record it here in case anyone would use it in the future.
74    pub key_indices: Vec<usize>,
75
76    /// Whether to join with the arrangement of the current epoch
77    pub use_current_epoch: bool,
78
79    pub batch_table: BatchTable<S>,
80}
81
82/// Message from the `arrange_join_stream`.
83#[derive(Debug)]
84pub enum ArrangeMessage {
85    /// Arrangement sides' update in this epoch. There will be only one arrange batch message
86    /// within epoch. Once the executor receives an arrange batch message, it can start doing
87    /// joins.
88    ArrangeReady(Vec<StreamChunk>, Barrier),
89
90    /// There's a message from stream side.
91    Stream(StreamChunk),
92
93    /// Barrier (once every epoch).
94    Barrier(Barrier),
95}
96
97pub type BarrierAlignedMessage = Either<Message, Message>;
98
99#[try_stream(ok = Message, error = StreamExecutorError)]
100pub async fn poll_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
101    #[for_await]
102    for item in stream {
103        match item? {
104            Message::Watermark(_) => {
105                // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
106            }
107            c @ Message::Chunk(_) => yield c,
108            Message::Barrier(b) => {
109                if b.epoch != expected_barrier.epoch {
110                    return Err(StreamExecutorError::align_barrier(expected_barrier, b));
111                } else {
112                    yield Message::Barrier(b);
113                    break;
114                }
115            }
116        }
117    }
118}
119
120/// A biased barrier aligner which prefers message from the right side. Barrier message will be
121/// available for both left and right side, instead of being combined.
122#[try_stream(ok = BarrierAlignedMessage, error = StreamExecutorError)]
123pub async fn align_barrier(mut left: BoxedMessageStream, mut right: BoxedMessageStream) {
124    enum SideStatus {
125        LeftBarrier,
126        RightBarrier,
127    }
128
129    'outer: loop {
130        let (side_status, side_barrier) = 'inner: loop {
131            // Prefer right
132            let select_result = match select(right.next(), left.next()).await {
133                future::Either::Left(x) => future::Either::Right(x),
134                future::Either::Right(x) => future::Either::Left(x),
135            };
136            match select_result {
137                future::Either::Left((None, _)) => {
138                    // left stream end, passthrough right chunks
139                    while let Some(msg) = right.next().await {
140                        match msg? {
141                            w @ Message::Watermark(_) => yield Either::Left(w),
142                            c @ Message::Chunk(_) => yield Either::Left(c),
143                            Message::Barrier(_) => {
144                                bail!("right barrier received while left stream end");
145                            }
146                        }
147                    }
148                    break 'outer;
149                }
150                future::Either::Right((None, _)) => {
151                    // right stream end, passthrough left chunks
152                    while let Some(msg) = left.next().await {
153                        match msg? {
154                            w @ Message::Watermark(_) => yield Either::Right(w),
155                            c @ Message::Chunk(_) => yield Either::Right(c),
156                            Message::Barrier(_) => {
157                                bail!("left barrier received while right stream end");
158                            }
159                        }
160                    }
161                    break 'outer;
162                }
163                future::Either::Left((Some(msg), _)) => match msg? {
164                    w @ Message::Watermark(_) => yield Either::Left(w),
165                    c @ Message::Chunk(_) => yield Either::Left(c),
166                    Message::Barrier(b) => {
167                        yield Either::Left(Message::Barrier(b.clone()));
168                        break 'inner (SideStatus::LeftBarrier, b);
169                    }
170                },
171                future::Either::Right((Some(msg), _)) => match msg? {
172                    w @ Message::Watermark(_) => yield Either::Right(w),
173                    c @ Message::Chunk(_) => yield Either::Right(c),
174                    Message::Barrier(b) => {
175                        yield Either::Right(Message::Barrier(b.clone()));
176                        break 'inner (SideStatus::RightBarrier, b);
177                    }
178                },
179            }
180        };
181
182        match side_status {
183            SideStatus::LeftBarrier => {
184                #[for_await]
185                for item in poll_until_barrier(right.by_ref(), side_barrier) {
186                    yield Either::Right(item?);
187                }
188            }
189            SideStatus::RightBarrier => {
190                #[for_await]
191                for item in poll_until_barrier(left.by_ref(), side_barrier) {
192                    yield Either::Left(item?);
193                }
194            }
195        }
196    }
197}
198
199/// Join the stream with the previous stable snapshot of the arrangement.
200///
201/// For example, the executor will receive the following message sequence from
202/// `stream_lookup_arrange_prev_epoch`:
203///
204/// * `[Msg`] Barrier (prev = `[1`], current = `[2`])
205/// * `[Msg`] Stream (key = a)
206/// * `[Do`] lookup `a` in arrangement of epoch `[1`] (prev epoch)
207/// * `[Msg`] Arrangement (batch)
208/// * `[Msg`] Stream (key = b)
209/// * `[Do`] lookup `b` in arrangement of epoch `[1`] (prev epoch)
210/// * `[Do`] update cache with epoch `[2`]
211/// * Barrier (prev = `[2`], current = `[3`])
212/// * `[Msg`] Arrangement (batch)
213#[try_stream(ok = ArrangeMessage, error = StreamExecutorError)]
214pub async fn stream_lookup_arrange_prev_epoch(stream: Executor, arrangement: Executor) {
215    let mut input = pin!(align_barrier(stream.execute(), arrangement.execute()));
216    let mut arrange_buf = vec![];
217    let mut stream_side_end = false;
218
219    loop {
220        let mut arrange_barrier = None;
221
222        while let Some(item) = input.next().await {
223            match item? {
224                Either::Left(Message::Chunk(msg)) => {
225                    // As prev epoch is already available, we can directly forward messages from the
226                    // stream side.
227                    yield ArrangeMessage::Stream(msg);
228                }
229                Either::Right(Message::Chunk(chunk)) => {
230                    // For message from the arrangement side, put it in a buf
231                    arrange_buf.push(chunk);
232                }
233                Either::Left(Message::Barrier(barrier)) => {
234                    yield ArrangeMessage::Barrier(barrier);
235                    stream_side_end = true;
236                }
237                Either::Right(Message::Barrier(barrier)) => {
238                    if stream_side_end {
239                        yield ArrangeMessage::ArrangeReady(
240                            std::mem::take(&mut arrange_buf),
241                            barrier,
242                        );
243                        stream_side_end = false;
244                    } else {
245                        arrange_barrier = Some(barrier);
246                        break;
247                    }
248                }
249                Either::Left(Message::Watermark(_)) => {
250                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
251                }
252                Either::Right(Message::Watermark(_)) => {
253                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
254                }
255            }
256        }
257
258        loop {
259            match input
260                .next()
261                .await
262                .context("unexpected close of barrier aligner")??
263            {
264                Either::Left(Message::Watermark(_)) => {
265                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
266                }
267                Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
268                Either::Left(Message::Barrier(b)) => {
269                    yield ArrangeMessage::Barrier(b);
270                    break;
271                }
272                Either::Right(_) => unreachable!(),
273            }
274        }
275
276        yield ArrangeMessage::ArrangeReady(
277            std::mem::take(&mut arrange_buf),
278            arrange_barrier.take().unwrap(),
279        );
280    }
281}
282
283/// Join the stream with the current state of the arrangement.
284///
285/// For example, the executor will receive the following message sequence from
286/// `stream_lookup_arrange_this_epoch`:
287///
288/// * `[Msg`] Barrier (prev = `[1`], current = `[2`])
289/// * `[Msg`] Arrangement (batch)
290/// * `[Do`] update cache with epoch `[2`]
291/// * `[Msg`] Stream (key = a)
292/// * `[Do`] lookup `a` in arrangement of epoch `[2`] (current epoch)
293/// * Barrier (prev = `[2`], current = `[3`])
294#[try_stream(ok = ArrangeMessage, error = StreamExecutorError)]
295pub async fn stream_lookup_arrange_this_epoch(stream: Executor, arrangement: Executor) {
296    let mut input = pin!(align_barrier(stream.execute(), arrangement.execute()));
297    let mut stream_buf = vec![];
298    let mut arrange_buf = vec![];
299
300    enum Status {
301        ArrangeReady,
302        StreamReady(Barrier),
303    }
304
305    loop {
306        let status = 'inner: loop {
307            match input
308                .next()
309                .await
310                .context("unexpected close of barrier aligner")??
311            {
312                Either::Left(Message::Chunk(msg)) => {
313                    // Should wait until arrangement from this epoch is available.
314                    stream_buf.push(msg);
315                }
316                Either::Right(Message::Chunk(chunk)) => {
317                    // For message from the arrangement side, put it in buf.
318                    arrange_buf.push(chunk);
319                }
320                Either::Left(Message::Barrier(barrier)) => {
321                    break 'inner Status::StreamReady(barrier);
322                }
323                Either::Right(Message::Barrier(barrier)) => {
324                    yield ArrangeMessage::ArrangeReady(std::mem::take(&mut arrange_buf), barrier);
325                    for msg in std::mem::take(&mut stream_buf) {
326                        yield ArrangeMessage::Stream(msg);
327                    }
328                    break 'inner Status::ArrangeReady;
329                }
330                Either::Left(Message::Watermark(_)) => {
331                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
332                }
333                Either::Right(Message::Watermark(_)) => {
334                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
335                }
336            }
337        };
338        match status {
339            // Arrangement is ready, but still stream message in this epoch -- we directly forward
340            // message from the stream side.
341            Status::ArrangeReady => loop {
342                match input
343                    .next()
344                    .await
345                    .context("unexpected close of barrier aligner")??
346                {
347                    Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
348                    Either::Left(Message::Barrier(b)) => {
349                        yield ArrangeMessage::Barrier(b);
350                        break;
351                    }
352                    Either::Left(Message::Watermark(_)) => {
353                        // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
354                    }
355                    Either::Right(Message::Watermark(_)) => {
356                        // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
357                    }
358                    Either::Right(_) => unreachable!(),
359                }
360            },
361            // Stream is done in this epoch, but arrangement is not ready -- we wait for the
362            // arrangement ready and pipe out all buffered stream messages.
363            Status::StreamReady(stream_barrier) => loop {
364                match input
365                    .next()
366                    .await
367                    .context("unexpected close of barrier aligner")??
368                {
369                    Either::Left(_) => unreachable!(),
370                    Either::Right(Message::Chunk(chunk)) => {
371                        arrange_buf.push(chunk);
372                    }
373                    Either::Right(Message::Barrier(barrier)) => {
374                        yield ArrangeMessage::ArrangeReady(
375                            std::mem::take(&mut arrange_buf),
376                            barrier,
377                        );
378                        for msg in std::mem::take(&mut stream_buf) {
379                            yield ArrangeMessage::Stream(msg);
380                        }
381                        yield ArrangeMessage::Barrier(stream_barrier);
382                        break;
383                    }
384                    Either::Right(Message::Watermark(_)) => {
385                        // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
386                    }
387                }
388            },
389        }
390    }
391}
392
393impl<S: StateStore> std::fmt::Debug for ArrangeJoinSide<S> {
394    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395        f.debug_struct("ArrangeJoinSide")
396            .field("pk_indices", &self.pk_indices)
397            .field("col_types", &self.col_types)
398            .field("col_descs", &self.col_descs)
399            .field("order_rules", &self.order_rules)
400            .field("use_current_epoch", &self.use_current_epoch)
401            .finish()
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use futures::StreamExt;
408    use risingwave_common::array::{StreamChunk, StreamChunkTestExt};
409    use risingwave_common::catalog::{Field, Schema};
410    use risingwave_common::types::DataType;
411    use risingwave_common::util::epoch::test_epoch;
412
413    use crate::executor::StreamExecutorResult;
414    use crate::executor::lookup::sides::stream_lookup_arrange_this_epoch;
415    use crate::executor::test_utils::MockSource;
416
417    #[tokio::test]
418    async fn test_stream_lookup_arrange_this_epoch() -> StreamExecutorResult<()> {
419        let chunk_l1 = StreamChunk::from_pretty(
420            "  I I
421             + 1 1",
422        );
423
424        let schema = Schema {
425            fields: vec![
426                Field::unnamed(DataType::Int64), // join key
427                Field::unnamed(DataType::Int64),
428            ],
429        };
430        let (mut tx_l, source_l) = MockSource::channel();
431        let source_l = source_l
432            .stop_on_finish(false)
433            .into_executor(schema.clone(), vec![1]);
434        let (tx_r, source_r) = MockSource::channel();
435        let source_r = source_r
436            .stop_on_finish(false)
437            .into_executor(schema, vec![1]);
438
439        let mut stream = stream_lookup_arrange_this_epoch(source_l, source_r).boxed();
440
441        // Simulate recovery test
442        drop(tx_r);
443
444        tx_l.push_barrier(test_epoch(1), false);
445
446        tx_l.push_chunk(chunk_l1);
447
448        // It should throw an error instead of panic.
449        stream.next().await.unwrap().unwrap_err();
450
451        Ok(())
452    }
453}