risingwave_common/array/
stream_record.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::convert::Infallible;
16
17use auto_enums::auto_enum;
18
19use super::StreamChunk;
20use crate::array::Op;
21use crate::row::Row;
22use crate::types::DataType;
23
24/// Type of a row change, without row data.
25#[derive(Debug, Copy, Clone)]
26pub enum RecordType {
27    Insert,
28    Delete,
29    Update,
30}
31
32impl RecordType {
33    /// Get the corresponding `Op`s for this record type.
34    pub fn ops(self) -> &'static [Op] {
35        match self {
36            RecordType::Insert => &[Op::Insert],
37            RecordType::Delete => &[Op::Delete],
38            RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert],
39        }
40    }
41}
42
43/// Generic type to represent a row change.
44#[derive(Debug, Clone, Copy)]
45pub enum Record<R> {
46    Insert { new_row: R },
47    Delete { old_row: R },
48    Update { old_row: R, new_row: R },
49}
50
51impl<R> Record<R> {
52    /// Convert this stream record to one or two rows with corresponding ops.
53    #[auto_enum(Iterator)]
54    pub fn into_rows(self) -> impl Iterator<Item = (Op, R)> {
55        match self {
56            Record::Insert { new_row } => std::iter::once((Op::Insert, new_row)),
57            Record::Delete { old_row } => std::iter::once((Op::Delete, old_row)),
58            Record::Update { old_row, new_row } => {
59                [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)].into_iter()
60            }
61        }
62    }
63
64    /// Get record type of this record.
65    pub fn to_record_type(&self) -> RecordType {
66        match self {
67            Record::Insert { .. } => RecordType::Insert,
68            Record::Delete { .. } => RecordType::Delete,
69            Record::Update { .. } => RecordType::Update,
70        }
71    }
72
73    /// Convert from `&Record<R>` to `Record<&R>`.
74    pub fn as_ref(&self) -> Record<&R> {
75        match self {
76            Record::Insert { new_row } => Record::Insert { new_row },
77            Record::Delete { old_row } => Record::Delete { old_row },
78            Record::Update { old_row, new_row } => Record::Update { old_row, new_row },
79        }
80    }
81
82    /// Convert this record to upsert format, by rewriting `Update` to `Insert` and only keeping
83    /// the new row.
84    pub fn into_upsert(self) -> Self {
85        match self {
86            Record::Insert { new_row } => Record::Insert { new_row },
87            Record::Delete { old_row } => Record::Delete { old_row },
88            Record::Update { new_row, .. } => Record::Insert { new_row },
89        }
90    }
91
92    /// Try mapping the row in the record to another row, returning error if any of the mapping fails.
93    pub fn try_map<R2, E>(self, f: impl Fn(R) -> Result<R2, E>) -> Result<Record<R2>, E> {
94        Ok(match self {
95            Record::Insert { new_row } => Record::Insert {
96                new_row: f(new_row)?,
97            },
98            Record::Delete { old_row } => Record::Delete {
99                old_row: f(old_row)?,
100            },
101            Record::Update { old_row, new_row } => Record::Update {
102                old_row: f(old_row)?,
103                new_row: f(new_row)?,
104            },
105        })
106    }
107
108    /// Map the row in the record to another row.
109    pub fn map<R2>(self, f: impl Fn(R) -> R2) -> Record<R2> {
110        let Ok(record) = self.try_map::<R2, Infallible>(|row| Ok(f(row)));
111        record
112    }
113}
114
115impl<R: Row> Record<R> {
116    /// Convert this stream record to a stream chunk containing only 1 or 2 rows.
117    pub fn to_stream_chunk(&self, data_types: &[DataType]) -> StreamChunk {
118        match self {
119            Record::Insert { new_row } => {
120                StreamChunk::from_rows(&[(Op::Insert, new_row)], data_types)
121            }
122            Record::Delete { old_row } => {
123                StreamChunk::from_rows(&[(Op::Delete, old_row)], data_types)
124            }
125            Record::Update { old_row, new_row } => StreamChunk::from_rows(
126                &[(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)],
127                data_types,
128            ),
129        }
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use crate::row::OwnedRow;
137    use crate::test_prelude::StreamChunkTestExt;
138
139    #[test]
140    fn test_into_rows() {
141        let record = Record::Insert {
142            new_row: OwnedRow::new(vec![Some(1.into())]),
143        };
144        let rows: Vec<_> = record.into_rows().collect();
145        assert_eq!(rows.len(), 1);
146        assert_eq!(rows[0].0, Op::Insert);
147        assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
148
149        let record = Record::Delete {
150            old_row: OwnedRow::new(vec![Some(1.into())]),
151        };
152        let rows: Vec<_> = record.into_rows().collect();
153        assert_eq!(rows.len(), 1);
154        assert_eq!(rows[0].0, Op::Delete);
155        assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
156
157        let record = Record::Update {
158            old_row: OwnedRow::new(vec![Some(1.into())]),
159            new_row: OwnedRow::new(vec![Some(2.into())]),
160        };
161        let rows: Vec<_> = record.into_rows().collect();
162        assert_eq!(rows.len(), 2);
163        assert_eq!(rows[0].0, Op::UpdateDelete);
164        assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
165        assert_eq!(rows[1].0, Op::UpdateInsert);
166        assert_eq!(rows[1].1, OwnedRow::new(vec![Some(2.into())]));
167    }
168
169    #[test]
170    fn test_to_stream_chunk() {
171        let record = Record::Insert {
172            new_row: OwnedRow::new(vec![Some(1i64.into())]),
173        };
174        let chunk = record.to_stream_chunk(&[DataType::Int64]);
175        assert_eq!(
176            chunk,
177            StreamChunk::from_pretty(
178                " I
179                + 1"
180            )
181        );
182
183        let record = Record::Delete {
184            old_row: OwnedRow::new(vec![Some(1i64.into())]),
185        };
186        let chunk = record.to_stream_chunk(&[DataType::Int64]);
187        assert_eq!(
188            chunk,
189            StreamChunk::from_pretty(
190                " I
191                - 1"
192            )
193        );
194
195        let record = Record::Update {
196            old_row: OwnedRow::new(vec![Some(1i64.into())]),
197            new_row: OwnedRow::new(vec![Some(2i64.into())]),
198        };
199        let chunk = record.to_stream_chunk(&[DataType::Int64]);
200        assert_eq!(
201            chunk,
202            StreamChunk::from_pretty(
203                "  I
204                U- 1
205                U+ 2"
206            )
207        );
208    }
209}