risingwave_common/array/
stream_record.rs1use std::convert::Infallible;
16
17use auto_enums::auto_enum;
18
19use super::StreamChunk;
20use crate::array::Op;
21use crate::row::Row;
22use crate::types::DataType;
23
24#[derive(Debug, Copy, Clone)]
26pub enum RecordType {
27 Insert,
28 Delete,
29 Update,
30}
31
32impl RecordType {
33 pub fn ops(self) -> &'static [Op] {
35 match self {
36 RecordType::Insert => &[Op::Insert],
37 RecordType::Delete => &[Op::Delete],
38 RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert],
39 }
40 }
41}
42
43#[derive(Debug, Clone, Copy)]
45pub enum Record<R> {
46 Insert { new_row: R },
47 Delete { old_row: R },
48 Update { old_row: R, new_row: R },
49}
50
51impl<R> Record<R> {
52 #[auto_enum(Iterator)]
54 pub fn into_rows(self) -> impl Iterator<Item = (Op, R)> {
55 match self {
56 Record::Insert { new_row } => std::iter::once((Op::Insert, new_row)),
57 Record::Delete { old_row } => std::iter::once((Op::Delete, old_row)),
58 Record::Update { old_row, new_row } => {
59 [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)].into_iter()
60 }
61 }
62 }
63
64 pub fn to_record_type(&self) -> RecordType {
66 match self {
67 Record::Insert { .. } => RecordType::Insert,
68 Record::Delete { .. } => RecordType::Delete,
69 Record::Update { .. } => RecordType::Update,
70 }
71 }
72
73 pub fn as_ref(&self) -> Record<&R> {
75 match self {
76 Record::Insert { new_row } => Record::Insert { new_row },
77 Record::Delete { old_row } => Record::Delete { old_row },
78 Record::Update { old_row, new_row } => Record::Update { old_row, new_row },
79 }
80 }
81
82 pub fn into_upsert(self) -> Self {
85 match self {
86 Record::Insert { new_row } => Record::Insert { new_row },
87 Record::Delete { old_row } => Record::Delete { old_row },
88 Record::Update { new_row, .. } => Record::Insert { new_row },
89 }
90 }
91
92 pub fn try_map<R2, E>(self, f: impl Fn(R) -> Result<R2, E>) -> Result<Record<R2>, E> {
94 Ok(match self {
95 Record::Insert { new_row } => Record::Insert {
96 new_row: f(new_row)?,
97 },
98 Record::Delete { old_row } => Record::Delete {
99 old_row: f(old_row)?,
100 },
101 Record::Update { old_row, new_row } => Record::Update {
102 old_row: f(old_row)?,
103 new_row: f(new_row)?,
104 },
105 })
106 }
107
108 pub fn map<R2>(self, f: impl Fn(R) -> R2) -> Record<R2> {
110 let Ok(record) = self.try_map::<R2, Infallible>(|row| Ok(f(row)));
111 record
112 }
113}
114
115impl<R: Row> Record<R> {
116 pub fn to_stream_chunk(&self, data_types: &[DataType]) -> StreamChunk {
118 match self {
119 Record::Insert { new_row } => {
120 StreamChunk::from_rows(&[(Op::Insert, new_row)], data_types)
121 }
122 Record::Delete { old_row } => {
123 StreamChunk::from_rows(&[(Op::Delete, old_row)], data_types)
124 }
125 Record::Update { old_row, new_row } => StreamChunk::from_rows(
126 &[(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)],
127 data_types,
128 ),
129 }
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use crate::row::OwnedRow;
137 use crate::test_prelude::StreamChunkTestExt;
138
139 #[test]
140 fn test_into_rows() {
141 let record = Record::Insert {
142 new_row: OwnedRow::new(vec![Some(1.into())]),
143 };
144 let rows: Vec<_> = record.into_rows().collect();
145 assert_eq!(rows.len(), 1);
146 assert_eq!(rows[0].0, Op::Insert);
147 assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
148
149 let record = Record::Delete {
150 old_row: OwnedRow::new(vec![Some(1.into())]),
151 };
152 let rows: Vec<_> = record.into_rows().collect();
153 assert_eq!(rows.len(), 1);
154 assert_eq!(rows[0].0, Op::Delete);
155 assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
156
157 let record = Record::Update {
158 old_row: OwnedRow::new(vec![Some(1.into())]),
159 new_row: OwnedRow::new(vec![Some(2.into())]),
160 };
161 let rows: Vec<_> = record.into_rows().collect();
162 assert_eq!(rows.len(), 2);
163 assert_eq!(rows[0].0, Op::UpdateDelete);
164 assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
165 assert_eq!(rows[1].0, Op::UpdateInsert);
166 assert_eq!(rows[1].1, OwnedRow::new(vec![Some(2.into())]));
167 }
168
169 #[test]
170 fn test_to_stream_chunk() {
171 let record = Record::Insert {
172 new_row: OwnedRow::new(vec![Some(1i64.into())]),
173 };
174 let chunk = record.to_stream_chunk(&[DataType::Int64]);
175 assert_eq!(
176 chunk,
177 StreamChunk::from_pretty(
178 " I
179 + 1"
180 )
181 );
182
183 let record = Record::Delete {
184 old_row: OwnedRow::new(vec![Some(1i64.into())]),
185 };
186 let chunk = record.to_stream_chunk(&[DataType::Int64]);
187 assert_eq!(
188 chunk,
189 StreamChunk::from_pretty(
190 " I
191 - 1"
192 )
193 );
194
195 let record = Record::Update {
196 old_row: OwnedRow::new(vec![Some(1i64.into())]),
197 new_row: OwnedRow::new(vec![Some(2i64.into())]),
198 };
199 let chunk = record.to_stream_chunk(&[DataType::Int64]);
200 assert_eq!(
201 chunk,
202 StreamChunk::from_pretty(
203 " I
204 U- 1
205 U+ 2"
206 )
207 );
208 }
209}