risingwave_common/array/
stream_record.rs1use auto_enums::auto_enum;
16
17use super::StreamChunk;
18use crate::array::Op;
19use crate::row::Row;
20use crate::types::DataType;
21
22#[derive(Debug, Copy, Clone)]
24pub enum RecordType {
25 Insert,
26 Delete,
27 Update,
28}
29
30impl RecordType {
31 pub fn ops(self) -> &'static [Op] {
33 match self {
34 RecordType::Insert => &[Op::Insert],
35 RecordType::Delete => &[Op::Delete],
36 RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert],
37 }
38 }
39}
40
41#[derive(Debug, Clone, Copy)]
43pub enum Record<R: Row> {
44 Insert { new_row: R },
45 Delete { old_row: R },
46 Update { old_row: R, new_row: R },
47}
48
49impl<R: Row> Record<R> {
50 #[auto_enum(Iterator)]
52 pub fn into_rows(self) -> impl Iterator<Item = (Op, R)> {
53 match self {
54 Record::Insert { new_row } => std::iter::once((Op::Insert, new_row)),
55 Record::Delete { old_row } => std::iter::once((Op::Delete, old_row)),
56 Record::Update { old_row, new_row } => {
57 [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)].into_iter()
58 }
59 }
60 }
61
62 pub fn to_record_type(&self) -> RecordType {
64 match self {
65 Record::Insert { .. } => RecordType::Insert,
66 Record::Delete { .. } => RecordType::Delete,
67 Record::Update { .. } => RecordType::Update,
68 }
69 }
70
71 pub fn to_stream_chunk(&self, data_types: &[DataType]) -> StreamChunk {
73 match self {
74 Record::Insert { new_row } => {
75 StreamChunk::from_rows(&[(Op::Insert, new_row)], data_types)
76 }
77 Record::Delete { old_row } => {
78 StreamChunk::from_rows(&[(Op::Delete, old_row)], data_types)
79 }
80 Record::Update { old_row, new_row } => StreamChunk::from_rows(
81 &[(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)],
82 data_types,
83 ),
84 }
85 }
86
87 pub fn as_ref(&self) -> Record<&R> {
89 match self {
90 Record::Insert { new_row } => Record::Insert { new_row },
91 Record::Delete { old_row } => Record::Delete { old_row },
92 Record::Update { old_row, new_row } => Record::Update { old_row, new_row },
93 }
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100 use crate::row::OwnedRow;
101 use crate::test_prelude::StreamChunkTestExt;
102
103 #[test]
104 fn test_into_rows() {
105 let record = Record::Insert {
106 new_row: OwnedRow::new(vec![Some(1.into())]),
107 };
108 let rows: Vec<_> = record.into_rows().collect();
109 assert_eq!(rows.len(), 1);
110 assert_eq!(rows[0].0, Op::Insert);
111 assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
112
113 let record = Record::Delete {
114 old_row: OwnedRow::new(vec![Some(1.into())]),
115 };
116 let rows: Vec<_> = record.into_rows().collect();
117 assert_eq!(rows.len(), 1);
118 assert_eq!(rows[0].0, Op::Delete);
119 assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
120
121 let record = Record::Update {
122 old_row: OwnedRow::new(vec![Some(1.into())]),
123 new_row: OwnedRow::new(vec![Some(2.into())]),
124 };
125 let rows: Vec<_> = record.into_rows().collect();
126 assert_eq!(rows.len(), 2);
127 assert_eq!(rows[0].0, Op::UpdateDelete);
128 assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
129 assert_eq!(rows[1].0, Op::UpdateInsert);
130 assert_eq!(rows[1].1, OwnedRow::new(vec![Some(2.into())]));
131 }
132
133 #[test]
134 fn test_to_stream_chunk() {
135 let record = Record::Insert {
136 new_row: OwnedRow::new(vec![Some(1i64.into())]),
137 };
138 let chunk = record.to_stream_chunk(&[DataType::Int64]);
139 assert_eq!(
140 chunk,
141 StreamChunk::from_pretty(
142 " I
143 + 1"
144 )
145 );
146
147 let record = Record::Delete {
148 old_row: OwnedRow::new(vec![Some(1i64.into())]),
149 };
150 let chunk = record.to_stream_chunk(&[DataType::Int64]);
151 assert_eq!(
152 chunk,
153 StreamChunk::from_pretty(
154 " I
155 - 1"
156 )
157 );
158
159 let record = Record::Update {
160 old_row: OwnedRow::new(vec![Some(1i64.into())]),
161 new_row: OwnedRow::new(vec![Some(2i64.into())]),
162 };
163 let chunk = record.to_stream_chunk(&[DataType::Int64]);
164 assert_eq!(
165 chunk,
166 StreamChunk::from_pretty(
167 " I
168 U- 1
169 U+ 2"
170 )
171 );
172 }
173}