Skip to main content

risingwave_stream/executor/lookup/
sides.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::pin;
16
17use anyhow::Context;
18use either::Either;
19use futures::future::select;
20use futures::{StreamExt, future};
21use futures_async_stream::try_stream;
22use risingwave_common::array::StreamChunk;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnDesc;
25use risingwave_common::types::DataType;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_storage::StateStore;
28use risingwave_storage::row_serde::value_serde::ValueRowSerde;
29
30use crate::common::table::state_table::ReplicatedStateTable;
31use crate::executor::error::StreamExecutorError;
32use crate::executor::{Barrier, BoxedMessageStream, Message, MessageStream};
33
34/// Join side of Lookup Executor's stream
35pub(crate) struct StreamJoinSide {
36    /// Indices of the join key columns
37    pub key_indices: Vec<usize>,
38
39    /// The primary key indices of this side, used for state store
40    pub pk_indices: Vec<usize>,
41
42    /// The date type of each columns to join on
43    pub col_types: Vec<DataType>,
44}
45
46impl std::fmt::Debug for StreamJoinSide {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("ArrangeJoinSide")
49            .field("key_indices", &self.key_indices)
50            .field("pk_indices", &self.pk_indices)
51            .field("col_types", &self.col_types)
52            .finish()
53    }
54}
55
56/// Join side of Arrange Executor's stream
57pub(crate) struct ArrangeJoinSide<S: StateStore, SD: ValueRowSerde> {
58    /// The primary key indices of this side, used for state store
59    pub pk_indices: Vec<usize>,
60
61    /// The datatype of columns in arrangement
62    pub col_types: Vec<DataType>,
63
64    /// The column descriptors of columns in arrangement
65    pub col_descs: Vec<ColumnDesc>,
66
67    /// Order rules of the arrangement (only join key is needed, pk should not be included, used
68    /// for lookup)
69    pub order_rules: Vec<ColumnOrder>,
70
71    /// Key indices for the join
72    ///
73    /// The key indices of the arrange side won't be used for the lookup process, but we still
74    /// record it here in case anyone would use it in the future.
75    pub key_indices: Vec<usize>,
76
77    /// Whether to join with the arrangement of the current epoch
78    pub use_current_epoch: bool,
79
80    pub state_table: ReplicatedStateTable<S, SD>,
81}
82
83/// Message from the `arrange_join_stream`.
84#[derive(Debug)]
85pub enum ArrangeMessage {
86    /// Arrangement sides' update in this epoch. There will be only one arrange batch message
87    /// within epoch. Once the executor receives an arrange batch message, it can start doing
88    /// joins.
89    ArrangeReady(Vec<StreamChunk>, Barrier),
90
91    /// There's a message from stream side.
92    Stream(StreamChunk),
93
94    /// Barrier (once every epoch).
95    Barrier(Barrier),
96}
97
98pub type BarrierAlignedMessage = Either<Message, Message>;
99
100#[try_stream(ok = Message, error = StreamExecutorError)]
101pub async fn poll_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
102    #[for_await]
103    for item in stream {
104        match item? {
105            Message::Watermark(_) => {
106                // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
107            }
108            c @ Message::Chunk(_) => yield c,
109            Message::Barrier(b) => {
110                if b.epoch != expected_barrier.epoch {
111                    return Err(StreamExecutorError::align_barrier(expected_barrier, b));
112                } else {
113                    yield Message::Barrier(b);
114                    break;
115                }
116            }
117        }
118    }
119}
120
121/// A biased barrier aligner which prefers message from the right side. Barrier message will be
122/// available for both left and right side, instead of being combined.
123#[try_stream(ok = BarrierAlignedMessage, error = StreamExecutorError)]
124pub async fn align_barrier(mut left: BoxedMessageStream, mut right: BoxedMessageStream) {
125    enum SideStatus {
126        LeftBarrier,
127        RightBarrier,
128    }
129
130    'outer: loop {
131        let (side_status, side_barrier) = 'inner: loop {
132            // Prefer right
133            let select_result = match select(right.next(), left.next()).await {
134                future::Either::Left(x) => future::Either::Right(x),
135                future::Either::Right(x) => future::Either::Left(x),
136            };
137            match select_result {
138                future::Either::Left((None, _)) => {
139                    // left stream end, passthrough right chunks
140                    while let Some(msg) = right.next().await {
141                        match msg? {
142                            w @ Message::Watermark(_) => yield Either::Left(w),
143                            c @ Message::Chunk(_) => yield Either::Left(c),
144                            Message::Barrier(_) => {
145                                bail!("right barrier received while left stream end");
146                            }
147                        }
148                    }
149                    break 'outer;
150                }
151                future::Either::Right((None, _)) => {
152                    // right stream end, passthrough left chunks
153                    while let Some(msg) = left.next().await {
154                        match msg? {
155                            w @ Message::Watermark(_) => yield Either::Right(w),
156                            c @ Message::Chunk(_) => yield Either::Right(c),
157                            Message::Barrier(_) => {
158                                bail!("left barrier received while right stream end");
159                            }
160                        }
161                    }
162                    break 'outer;
163                }
164                future::Either::Left((Some(msg), _)) => match msg? {
165                    w @ Message::Watermark(_) => yield Either::Left(w),
166                    c @ Message::Chunk(_) => yield Either::Left(c),
167                    Message::Barrier(b) => {
168                        yield Either::Left(Message::Barrier(b.clone()));
169                        break 'inner (SideStatus::LeftBarrier, b);
170                    }
171                },
172                future::Either::Right((Some(msg), _)) => match msg? {
173                    w @ Message::Watermark(_) => yield Either::Right(w),
174                    c @ Message::Chunk(_) => yield Either::Right(c),
175                    Message::Barrier(b) => {
176                        yield Either::Right(Message::Barrier(b.clone()));
177                        break 'inner (SideStatus::RightBarrier, b);
178                    }
179                },
180            }
181        };
182
183        match side_status {
184            SideStatus::LeftBarrier => {
185                #[for_await]
186                for item in poll_until_barrier(right.by_ref(), side_barrier) {
187                    yield Either::Right(item?);
188                }
189            }
190            SideStatus::RightBarrier => {
191                #[for_await]
192                for item in poll_until_barrier(left.by_ref(), side_barrier) {
193                    yield Either::Left(item?);
194                }
195            }
196        }
197    }
198}
199
200/// Join the stream with the previous stable snapshot of the arrangement.
201///
202/// For example, the executor will receive the following message sequence from
203/// `stream_lookup_arrange_prev_epoch`:
204///
205/// * `[Msg`] Barrier (prev = `[1`], current = `[2`])
206/// * `[Msg`] Stream (key = a)
207/// * `[Do`] lookup `a` in arrangement of epoch `[1`] (prev epoch)
208/// * `[Msg`] Arrangement (batch)
209/// * `[Msg`] Stream (key = b)
210/// * `[Do`] lookup `b` in arrangement of epoch `[1`] (prev epoch)
211/// * `[Do`] update cache with epoch `[2`]
212/// * Barrier (prev = `[2`], current = `[3`])
213/// * `[Msg`] Arrangement (batch)
214#[try_stream(ok = ArrangeMessage, error = StreamExecutorError)]
215pub async fn stream_lookup_arrange_prev_epoch(
216    stream: BoxedMessageStream,
217    arrangement: BoxedMessageStream,
218) {
219    let mut input = pin!(align_barrier(stream, arrangement));
220    let mut arrange_buf = vec![];
221    let mut stream_side_end = false;
222
223    loop {
224        let mut arrange_barrier = None;
225
226        while let Some(item) = input.next().await {
227            match item? {
228                Either::Left(Message::Chunk(msg)) => {
229                    // As prev epoch is already available, we can directly forward messages from the
230                    // stream side.
231                    yield ArrangeMessage::Stream(msg);
232                }
233                Either::Right(Message::Chunk(chunk)) => {
234                    // For message from the arrangement side, put it in a buf
235                    arrange_buf.push(chunk);
236                }
237                Either::Left(Message::Barrier(barrier)) => {
238                    yield ArrangeMessage::Barrier(barrier);
239                    stream_side_end = true;
240                }
241                Either::Right(Message::Barrier(barrier)) => {
242                    if stream_side_end {
243                        yield ArrangeMessage::ArrangeReady(
244                            std::mem::take(&mut arrange_buf),
245                            barrier,
246                        );
247                        stream_side_end = false;
248                    } else {
249                        arrange_barrier = Some(barrier);
250                        break;
251                    }
252                }
253                Either::Left(Message::Watermark(_)) => {
254                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
255                }
256                Either::Right(Message::Watermark(_)) => {
257                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
258                }
259            }
260        }
261
262        loop {
263            match input
264                .next()
265                .await
266                .context("unexpected close of barrier aligner")??
267            {
268                Either::Left(Message::Watermark(_)) => {
269                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
270                }
271                Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
272                Either::Left(Message::Barrier(b)) => {
273                    yield ArrangeMessage::Barrier(b);
274                    break;
275                }
276                Either::Right(_) => unreachable!(),
277            }
278        }
279
280        yield ArrangeMessage::ArrangeReady(
281            std::mem::take(&mut arrange_buf),
282            arrange_barrier.take().unwrap(),
283        );
284    }
285}
286
287/// Join the stream with the current state of the arrangement.
288///
289/// For example, the executor will receive the following message sequence from
290/// `stream_lookup_arrange_this_epoch`:
291///
292/// * `[Msg`] Barrier (prev = `[1`], current = `[2`])
293/// * `[Msg`] Arrangement (batch)
294/// * `[Do`] update cache with epoch `[2`]
295/// * `[Msg`] Stream (key = a)
296/// * `[Do`] lookup `a` in arrangement of epoch `[2`] (current epoch)
297/// * Barrier (prev = `[2`], current = `[3`])
298#[try_stream(ok = ArrangeMessage, error = StreamExecutorError)]
299pub async fn stream_lookup_arrange_this_epoch(
300    stream: BoxedMessageStream,
301    arrangement: BoxedMessageStream,
302) {
303    let mut input = pin!(align_barrier(stream, arrangement));
304    let mut stream_buf = vec![];
305    let mut arrange_buf = vec![];
306
307    enum Status {
308        ArrangeReady,
309        StreamReady(Barrier),
310    }
311
312    loop {
313        let status = 'inner: loop {
314            match input
315                .next()
316                .await
317                .context("unexpected close of barrier aligner")??
318            {
319                Either::Left(Message::Chunk(msg)) => {
320                    // Should wait until arrangement from this epoch is available.
321                    stream_buf.push(msg);
322                }
323                Either::Right(Message::Chunk(chunk)) => {
324                    // For message from the arrangement side, put it in buf.
325                    arrange_buf.push(chunk);
326                }
327                Either::Left(Message::Barrier(barrier)) => {
328                    break 'inner Status::StreamReady(barrier);
329                }
330                Either::Right(Message::Barrier(barrier)) => {
331                    yield ArrangeMessage::ArrangeReady(std::mem::take(&mut arrange_buf), barrier);
332                    for msg in std::mem::take(&mut stream_buf) {
333                        yield ArrangeMessage::Stream(msg);
334                    }
335                    break 'inner Status::ArrangeReady;
336                }
337                Either::Left(Message::Watermark(_)) => {
338                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
339                }
340                Either::Right(Message::Watermark(_)) => {
341                    // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
342                }
343            }
344        };
345        match status {
346            // Arrangement is ready, but still stream message in this epoch -- we directly forward
347            // message from the stream side.
348            Status::ArrangeReady => loop {
349                match input
350                    .next()
351                    .await
352                    .context("unexpected close of barrier aligner")??
353                {
354                    Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
355                    Either::Left(Message::Barrier(b)) => {
356                        yield ArrangeMessage::Barrier(b);
357                        break;
358                    }
359                    Either::Left(Message::Watermark(_)) => {
360                        // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
361                    }
362                    Either::Right(Message::Watermark(_)) => {
363                        // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
364                    }
365                    Either::Right(_) => unreachable!(),
366                }
367            },
368            // Stream is done in this epoch, but arrangement is not ready -- we wait for the
369            // arrangement ready and pipe out all buffered stream messages.
370            Status::StreamReady(stream_barrier) => loop {
371                match input
372                    .next()
373                    .await
374                    .context("unexpected close of barrier aligner")??
375                {
376                    Either::Left(_) => unreachable!(),
377                    Either::Right(Message::Chunk(chunk)) => {
378                        arrange_buf.push(chunk);
379                    }
380                    Either::Right(Message::Barrier(barrier)) => {
381                        yield ArrangeMessage::ArrangeReady(
382                            std::mem::take(&mut arrange_buf),
383                            barrier,
384                        );
385                        for msg in std::mem::take(&mut stream_buf) {
386                            yield ArrangeMessage::Stream(msg);
387                        }
388                        yield ArrangeMessage::Barrier(stream_barrier);
389                        break;
390                    }
391                    Either::Right(Message::Watermark(_)) => {
392                        // TODO: https://github.com/risingwavelabs/risingwave/issues/6042
393                    }
394                }
395            },
396        }
397    }
398}
399
400impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for ArrangeJoinSide<S, SD> {
401    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402        f.debug_struct("ArrangeJoinSide")
403            .field("pk_indices", &self.pk_indices)
404            .field("col_types", &self.col_types)
405            .field("col_descs", &self.col_descs)
406            .field("order_rules", &self.order_rules)
407            .field("use_current_epoch", &self.use_current_epoch)
408            .finish()
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use futures::StreamExt;
415    use risingwave_common::array::{StreamChunk, StreamChunkTestExt};
416    use risingwave_common::catalog::{Field, Schema};
417    use risingwave_common::types::DataType;
418    use risingwave_common::util::epoch::test_epoch;
419
420    use crate::executor::StreamExecutorResult;
421    use crate::executor::lookup::sides::stream_lookup_arrange_this_epoch;
422    use crate::executor::test_utils::MockSource;
423
424    #[tokio::test]
425    async fn test_stream_lookup_arrange_this_epoch() -> StreamExecutorResult<()> {
426        let chunk_l1 = StreamChunk::from_pretty(
427            "  I I
428             + 1 1",
429        );
430
431        let schema = Schema {
432            fields: vec![
433                Field::unnamed(DataType::Int64), // join key
434                Field::unnamed(DataType::Int64),
435            ],
436        };
437        let (mut tx_l, source_l) = MockSource::channel();
438        let source_l = source_l
439            .stop_on_finish(false)
440            .into_executor(schema.clone(), vec![1]);
441        let (tx_r, source_r) = MockSource::channel();
442        let source_r = source_r
443            .stop_on_finish(false)
444            .into_executor(schema, vec![1]);
445
446        let mut stream =
447            stream_lookup_arrange_this_epoch(source_l.execute(), source_r.execute()).boxed();
448
449        // Simulate recovery test
450        drop(tx_r);
451
452        tx_l.push_barrier(test_epoch(1), false);
453
454        tx_l.push_chunk(chunk_l1);
455
456        // It should throw an error instead of panic.
457        stream.next().await.unwrap().unwrap_err();
458
459        Ok(())
460    }
461}