risingwave_stream/executor/wrapper/
update_check.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::iter::once;
16use std::sync::Arc;
17
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::Op;
21
22use crate::executor::error::StreamExecutorError;
23use crate::executor::{ExecutorInfo, Message, MessageStream};
24
25/// Streams wrapped by `update_check` will check whether the two rows of updates are next to each
26/// other.
27#[try_stream(ok = Message, error = StreamExecutorError)]
28pub async fn update_check(info: Arc<ExecutorInfo>, input: impl MessageStream) {
29    #[for_await]
30    for message in input {
31        let message = message?;
32
33        if let Message::Chunk(chunk) = &message {
34            for ((op1, row1), (op2, row2)) in once(None)
35                .chain(chunk.rows().map(Some))
36                .chain(once(None))
37                .map(|r| (r.unzip()))
38                .tuple_windows()
39            {
40                if (op1.is_none() && op2 == Some(Op::UpdateInsert)) // the first row is U+
41                    || (op1 == Some(Op::UpdateDelete) && op2 != Some(Op::UpdateInsert))
42                {
43                    panic!(
44                        "update check failed on `{}`: expect U+ after  U-:\n first row: {:?}\nsecond row: {:?}",
45                        info.identity, row1, row2,
46                    )
47                }
48            }
49        }
50
51        yield message;
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use futures::{StreamExt, pin_mut};
58    use risingwave_common::array::StreamChunk;
59    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
60
61    use super::*;
62    use crate::executor::test_utils::MockSource;
63
64    #[should_panic]
65    #[tokio::test]
66    async fn test_not_next_to_each_other() {
67        let (mut tx, source) = MockSource::channel();
68        let source = source.into_executor(Default::default(), vec![]);
69        tx.push_chunk(StreamChunk::from_pretty(
70            "     I
71            U-  114
72            U-  514
73            U+ 1919
74            U+  810",
75        ));
76
77        let checked = update_check(source.info().clone().into(), source.execute());
78        pin_mut!(checked);
79
80        checked.next().await.unwrap().unwrap(); // should panic
81    }
82
83    #[should_panic]
84    #[tokio::test]
85    async fn test_first_one_update_insert() {
86        let (mut tx, source) = MockSource::channel();
87        let source = source.into_executor(Default::default(), vec![]);
88        tx.push_chunk(StreamChunk::from_pretty(
89            "     I
90            U+  114",
91        ));
92
93        let checked = update_check(source.info().clone().into(), source.execute());
94        pin_mut!(checked);
95
96        checked.next().await.unwrap().unwrap(); // should panic
97    }
98
99    #[should_panic]
100    #[tokio::test]
101    async fn test_last_one_update_delete() {
102        let (mut tx, source) = MockSource::channel();
103        let source = source.into_executor(Default::default(), vec![]);
104        tx.push_chunk(StreamChunk::from_pretty(
105            "        I
106            U-     114
107            U+     514
108            U- 1919810",
109        ));
110
111        let checked = update_check(source.info().clone().into(), source.execute());
112        pin_mut!(checked);
113
114        checked.next().await.unwrap().unwrap(); // should panic
115    }
116
117    #[tokio::test]
118    async fn test_empty_chunk() {
119        let (mut tx, source) = MockSource::channel();
120        let source = source.into_executor(Default::default(), vec![]);
121        tx.push_chunk(StreamChunk::default());
122
123        let checked = update_check(source.info().clone().into(), source.execute());
124        pin_mut!(checked);
125
126        checked.next().await.unwrap().unwrap();
127    }
128}