risingwave_stream/executor/wrapper/
update_check.rs1use std::sync::Arc;
16
17use futures_async_stream::try_stream;
18use risingwave_common::array::stream_record::Record;
19use risingwave_common::row::RowExt;
20
21use crate::executor::error::StreamExecutorError;
22use crate::executor::{ExecutorInfo, Message, MessageStream};
23
24#[try_stream(ok = Message, error = StreamExecutorError)]
27pub async fn update_check(info: Arc<ExecutorInfo>, input: impl MessageStream) {
28 #[for_await]
29 for message in input {
30 let message = message?;
31
32 if let Message::Chunk(chunk) = &message {
33 for record in chunk.records() {
34 if let Record::Update { old_row, new_row } = record {
36 let old_pk = old_row.project(&info.stream_key);
37 let new_pk = new_row.project(&info.stream_key);
38 debug_assert_eq!(
39 old_pk,
40 new_pk,
41 "U- and U+ should have same stream key
42U- row: {}
43U- key: {}
44U+ row: {}
45U+ key: {}
46stream key indices: {:?}
47executor: {}",
48 old_row.display(),
49 old_pk.display(),
50 new_row.display(),
51 new_pk.display(),
52 info.stream_key,
53 info.identity
54 )
55 }
56 }
57 }
58
59 yield message;
60 }
61}
62
63#[cfg(test)]
64mod tests {
65 use futures::{StreamExt, pin_mut};
66 use risingwave_common::array::StreamChunk;
67 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
68
69 use super::*;
70 use crate::executor::test_utils::MockSource;
71
72 #[should_panic]
73 #[tokio::test]
74 async fn test_not_next_to_each_other() {
75 let (mut tx, source) = MockSource::channel();
76 let source = source.into_executor(Default::default(), vec![]);
77 tx.push_chunk(StreamChunk::from_pretty(
78 " I
79 U- 114
80 U- 514
81 U+ 1919
82 U+ 810",
83 ));
84
85 let checked = update_check(source.info().clone().into(), source.execute());
86 pin_mut!(checked);
87
88 checked.next().await.unwrap().unwrap(); }
90
91 #[should_panic]
92 #[tokio::test]
93 async fn test_first_one_update_insert() {
94 let (mut tx, source) = MockSource::channel();
95 let source = source.into_executor(Default::default(), vec![]);
96 tx.push_chunk(StreamChunk::from_pretty(
97 " I
98 U+ 114",
99 ));
100
101 let checked = update_check(source.info().clone().into(), source.execute());
102 pin_mut!(checked);
103
104 checked.next().await.unwrap().unwrap(); }
106
107 #[should_panic]
108 #[tokio::test]
109 async fn test_last_one_update_delete() {
110 let (mut tx, source) = MockSource::channel();
111 let source = source.into_executor(Default::default(), vec![]);
112 tx.push_chunk(StreamChunk::from_pretty(
113 " I
114 U- 114
115 U+ 514
116 U- 1919810",
117 ));
118
119 let checked = update_check(source.info().clone().into(), source.execute());
120 pin_mut!(checked);
121
122 checked.next().await.unwrap().unwrap(); }
124
125 #[tokio::test]
126 async fn test_empty_chunk() {
127 let (mut tx, source) = MockSource::channel();
128 let source = source.into_executor(Default::default(), vec![]);
129 tx.push_chunk(StreamChunk::default());
130
131 let checked = update_check(source.info().clone().into(), source.execute());
132 pin_mut!(checked);
133
134 checked.next().await.unwrap().unwrap();
135 }
136}