risingwave_common/array/
stream_record.rs1use std::convert::Infallible;
16
17use auto_enums::auto_enum;
18
19use super::StreamChunk;
20use crate::array::Op;
21use crate::row::Row;
22use crate::types::DataType;
23
24#[derive(Debug, Copy, Clone)]
26pub enum RecordType {
27 Insert,
28 Delete,
29 Update,
30}
31
32impl RecordType {
33 pub fn ops(self) -> &'static [Op] {
35 match self {
36 RecordType::Insert => &[Op::Insert],
37 RecordType::Delete => &[Op::Delete],
38 RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert],
39 }
40 }
41}
42
43#[derive(Debug, Clone, Copy)]
45pub enum Record<R> {
46 Insert { new_row: R },
47 Delete { old_row: R },
48 Update { old_row: R, new_row: R },
49}
50
51impl<R> Record<R> {
52 #[auto_enum(Iterator)]
54 pub fn into_rows(self) -> impl Iterator<Item = (Op, R)> {
55 match self {
56 Record::Insert { new_row } => std::iter::once((Op::Insert, new_row)),
57 Record::Delete { old_row } => std::iter::once((Op::Delete, old_row)),
58 Record::Update { old_row, new_row } => {
59 [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)].into_iter()
60 }
61 }
62 }
63
64 pub fn to_record_type(&self) -> RecordType {
66 match self {
67 Record::Insert { .. } => RecordType::Insert,
68 Record::Delete { .. } => RecordType::Delete,
69 Record::Update { .. } => RecordType::Update,
70 }
71 }
72
73 pub fn as_ref(&self) -> Record<&R> {
75 match self {
76 Record::Insert { new_row } => Record::Insert { new_row },
77 Record::Delete { old_row } => Record::Delete { old_row },
78 Record::Update { old_row, new_row } => Record::Update { old_row, new_row },
79 }
80 }
81
82 pub fn try_map<R2, E>(self, f: impl Fn(R) -> Result<R2, E>) -> Result<Record<R2>, E> {
84 Ok(match self {
85 Record::Insert { new_row } => Record::Insert {
86 new_row: f(new_row)?,
87 },
88 Record::Delete { old_row } => Record::Delete {
89 old_row: f(old_row)?,
90 },
91 Record::Update { old_row, new_row } => Record::Update {
92 old_row: f(old_row)?,
93 new_row: f(new_row)?,
94 },
95 })
96 }
97
98 pub fn map<R2>(self, f: impl Fn(R) -> R2) -> Record<R2> {
100 let Ok(record) = self.try_map::<R2, Infallible>(|row| Ok(f(row)));
101 record
102 }
103}
104
105impl<R: Row> Record<R> {
106 pub fn to_stream_chunk(&self, data_types: &[DataType]) -> StreamChunk {
108 match self {
109 Record::Insert { new_row } => {
110 StreamChunk::from_rows(&[(Op::Insert, new_row)], data_types)
111 }
112 Record::Delete { old_row } => {
113 StreamChunk::from_rows(&[(Op::Delete, old_row)], data_types)
114 }
115 Record::Update { old_row, new_row } => StreamChunk::from_rows(
116 &[(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)],
117 data_types,
118 ),
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::row::OwnedRow;
127 use crate::test_prelude::StreamChunkTestExt;
128
129 #[test]
130 fn test_into_rows() {
131 let record = Record::Insert {
132 new_row: OwnedRow::new(vec![Some(1.into())]),
133 };
134 let rows: Vec<_> = record.into_rows().collect();
135 assert_eq!(rows.len(), 1);
136 assert_eq!(rows[0].0, Op::Insert);
137 assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
138
139 let record = Record::Delete {
140 old_row: OwnedRow::new(vec![Some(1.into())]),
141 };
142 let rows: Vec<_> = record.into_rows().collect();
143 assert_eq!(rows.len(), 1);
144 assert_eq!(rows[0].0, Op::Delete);
145 assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
146
147 let record = Record::Update {
148 old_row: OwnedRow::new(vec![Some(1.into())]),
149 new_row: OwnedRow::new(vec![Some(2.into())]),
150 };
151 let rows: Vec<_> = record.into_rows().collect();
152 assert_eq!(rows.len(), 2);
153 assert_eq!(rows[0].0, Op::UpdateDelete);
154 assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
155 assert_eq!(rows[1].0, Op::UpdateInsert);
156 assert_eq!(rows[1].1, OwnedRow::new(vec![Some(2.into())]));
157 }
158
159 #[test]
160 fn test_to_stream_chunk() {
161 let record = Record::Insert {
162 new_row: OwnedRow::new(vec![Some(1i64.into())]),
163 };
164 let chunk = record.to_stream_chunk(&[DataType::Int64]);
165 assert_eq!(
166 chunk,
167 StreamChunk::from_pretty(
168 " I
169 + 1"
170 )
171 );
172
173 let record = Record::Delete {
174 old_row: OwnedRow::new(vec![Some(1i64.into())]),
175 };
176 let chunk = record.to_stream_chunk(&[DataType::Int64]);
177 assert_eq!(
178 chunk,
179 StreamChunk::from_pretty(
180 " I
181 - 1"
182 )
183 );
184
185 let record = Record::Update {
186 old_row: OwnedRow::new(vec![Some(1i64.into())]),
187 new_row: OwnedRow::new(vec![Some(2i64.into())]),
188 };
189 let chunk = record.to_stream_chunk(&[DataType::Int64]);
190 assert_eq!(
191 chunk,
192 StreamChunk::from_pretty(
193 " I
194 U- 1
195 U+ 2"
196 )
197 );
198 }
199}