risingwave_stream/executor/wrapper/
update_check.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures_async_stream::try_stream;
18use risingwave_common::array::stream_record::Record;
19use risingwave_common::row::RowExt;
20
21use crate::executor::error::StreamExecutorError;
22use crate::executor::{ExecutorInfo, Message, MessageStream};
23
24/// Streams wrapped by `update_check` will check whether the two rows of updates are next to each
25/// other.
26#[try_stream(ok = Message, error = StreamExecutorError)]
27pub async fn update_check(info: Arc<ExecutorInfo>, input: impl MessageStream) {
28    #[for_await]
29    for message in input {
30        let message = message?;
31
32        if let Message::Chunk(chunk) = &message {
33            for record in chunk.records() {
34                // `chunk.records()` will check U-/U+ pairing
35                if let Record::Update { old_row, new_row } = record {
36                    let old_pk = old_row.project(&info.stream_key);
37                    let new_pk = new_row.project(&info.stream_key);
38                    debug_assert_eq!(
39                        old_pk,
40                        new_pk,
41                        "U- and U+ should have same stream key
42U- row: {}
43U- key: {}
44U+ row: {}
45U+ key: {}
46stream key indices: {:?}
47executor: {}",
48                        old_row.display(),
49                        old_pk.display(),
50                        new_row.display(),
51                        new_pk.display(),
52                        info.stream_key,
53                        info.identity
54                    )
55                }
56            }
57        }
58
59        yield message;
60    }
61}
62
63#[cfg(test)]
64mod tests {
65    use futures::{StreamExt, pin_mut};
66    use risingwave_common::array::StreamChunk;
67    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
68
69    use super::*;
70    use crate::executor::test_utils::MockSource;
71
72    #[should_panic]
73    #[tokio::test]
74    async fn test_not_next_to_each_other() {
75        let (mut tx, source) = MockSource::channel();
76        let source = source.into_executor(Default::default(), vec![]);
77        tx.push_chunk(StreamChunk::from_pretty(
78            "     I
79            U-  114
80            U-  514
81            U+ 1919
82            U+  810",
83        ));
84
85        let checked = update_check(source.info().clone().into(), source.execute());
86        pin_mut!(checked);
87
88        checked.next().await.unwrap().unwrap(); // should panic
89    }
90
91    #[should_panic]
92    #[tokio::test]
93    async fn test_first_one_update_insert() {
94        let (mut tx, source) = MockSource::channel();
95        let source = source.into_executor(Default::default(), vec![]);
96        tx.push_chunk(StreamChunk::from_pretty(
97            "     I
98            U+  114",
99        ));
100
101        let checked = update_check(source.info().clone().into(), source.execute());
102        pin_mut!(checked);
103
104        checked.next().await.unwrap().unwrap(); // should panic
105    }
106
107    #[should_panic]
108    #[tokio::test]
109    async fn test_last_one_update_delete() {
110        let (mut tx, source) = MockSource::channel();
111        let source = source.into_executor(Default::default(), vec![]);
112        tx.push_chunk(StreamChunk::from_pretty(
113            "        I
114            U-     114
115            U+     514
116            U- 1919810",
117        ));
118
119        let checked = update_check(source.info().clone().into(), source.execute());
120        pin_mut!(checked);
121
122        checked.next().await.unwrap().unwrap(); // should panic
123    }
124
125    #[tokio::test]
126    async fn test_empty_chunk() {
127        let (mut tx, source) = MockSource::channel();
128        let source = source.into_executor(Default::default(), vec![]);
129        tx.push_chunk(StreamChunk::default());
130
131        let checked = update_check(source.info().clone().into(), source.execute());
132        pin_mut!(checked);
133
134        checked.next().await.unwrap().unwrap();
135    }
136}