risingwave_stream/executor/wrapper/
update_check.rs1use std::iter::once;
16use std::sync::Arc;
17
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::Op;
21
22use crate::executor::error::StreamExecutorError;
23use crate::executor::{ExecutorInfo, Message, MessageStream};
24
25#[try_stream(ok = Message, error = StreamExecutorError)]
28pub async fn update_check(info: Arc<ExecutorInfo>, input: impl MessageStream) {
29 #[for_await]
30 for message in input {
31 let message = message?;
32
33 if let Message::Chunk(chunk) = &message {
34 for ((op1, row1), (op2, row2)) in once(None)
35 .chain(chunk.rows().map(Some))
36 .chain(once(None))
37 .map(|r| (r.unzip()))
38 .tuple_windows()
39 {
40 if (op1.is_none() && op2 == Some(Op::UpdateInsert)) || (op1 == Some(Op::UpdateDelete) && op2 != Some(Op::UpdateInsert))
42 {
43 panic!(
44 "update check failed on `{}`: expect U+ after U-:\n first row: {:?}\nsecond row: {:?}",
45 info.identity, row1, row2,
46 )
47 }
48 }
49 }
50
51 yield message;
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use futures::{StreamExt, pin_mut};
58 use risingwave_common::array::StreamChunk;
59 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
60
61 use super::*;
62 use crate::executor::test_utils::MockSource;
63
64 #[should_panic]
65 #[tokio::test]
66 async fn test_not_next_to_each_other() {
67 let (mut tx, source) = MockSource::channel();
68 let source = source.into_executor(Default::default(), vec![]);
69 tx.push_chunk(StreamChunk::from_pretty(
70 " I
71 U- 114
72 U- 514
73 U+ 1919
74 U+ 810",
75 ));
76
77 let checked = update_check(source.info().clone().into(), source.execute());
78 pin_mut!(checked);
79
80 checked.next().await.unwrap().unwrap(); }
82
83 #[should_panic]
84 #[tokio::test]
85 async fn test_first_one_update_insert() {
86 let (mut tx, source) = MockSource::channel();
87 let source = source.into_executor(Default::default(), vec![]);
88 tx.push_chunk(StreamChunk::from_pretty(
89 " I
90 U+ 114",
91 ));
92
93 let checked = update_check(source.info().clone().into(), source.execute());
94 pin_mut!(checked);
95
96 checked.next().await.unwrap().unwrap(); }
98
99 #[should_panic]
100 #[tokio::test]
101 async fn test_last_one_update_delete() {
102 let (mut tx, source) = MockSource::channel();
103 let source = source.into_executor(Default::default(), vec![]);
104 tx.push_chunk(StreamChunk::from_pretty(
105 " I
106 U- 114
107 U+ 514
108 U- 1919810",
109 ));
110
111 let checked = update_check(source.info().clone().into(), source.execute());
112 pin_mut!(checked);
113
114 checked.next().await.unwrap().unwrap(); }
116
117 #[tokio::test]
118 async fn test_empty_chunk() {
119 let (mut tx, source) = MockSource::channel();
120 let source = source.into_executor(Default::default(), vec![]);
121 tx.push_chunk(StreamChunk::default());
122
123 let checked = update_check(source.info().clone().into(), source.execute());
124 pin_mut!(checked);
125
126 checked.next().await.unwrap().unwrap();
127 }
128}