risingwave_common/array/
stream_record.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::convert::Infallible;
16
17use auto_enums::auto_enum;
18
19use super::StreamChunk;
20use crate::array::Op;
21use crate::row::Row;
22use crate::types::DataType;
23
24/// Type of a row change, without row data.
25#[derive(Debug, Copy, Clone)]
26pub enum RecordType {
27    Insert,
28    Delete,
29    Update,
30}
31
32impl RecordType {
33    /// Get the corresponding `Op`s for this record type.
34    pub fn ops(self) -> &'static [Op] {
35        match self {
36            RecordType::Insert => &[Op::Insert],
37            RecordType::Delete => &[Op::Delete],
38            RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert],
39        }
40    }
41}
42
43/// Generic type to represent a row change.
44#[derive(Debug, Clone, Copy)]
45pub enum Record<R> {
46    Insert { new_row: R },
47    Delete { old_row: R },
48    Update { old_row: R, new_row: R },
49}
50
51impl<R> Record<R> {
52    /// Convert this stream record to one or two rows with corresponding ops.
53    #[auto_enum(Iterator)]
54    pub fn into_rows(self) -> impl Iterator<Item = (Op, R)> {
55        match self {
56            Record::Insert { new_row } => std::iter::once((Op::Insert, new_row)),
57            Record::Delete { old_row } => std::iter::once((Op::Delete, old_row)),
58            Record::Update { old_row, new_row } => {
59                [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)].into_iter()
60            }
61        }
62    }
63
64    /// Get record type of this record.
65    pub fn to_record_type(&self) -> RecordType {
66        match self {
67            Record::Insert { .. } => RecordType::Insert,
68            Record::Delete { .. } => RecordType::Delete,
69            Record::Update { .. } => RecordType::Update,
70        }
71    }
72
73    /// Convert from `&Record<R>` to `Record<&R>`.
74    pub fn as_ref(&self) -> Record<&R> {
75        match self {
76            Record::Insert { new_row } => Record::Insert { new_row },
77            Record::Delete { old_row } => Record::Delete { old_row },
78            Record::Update { old_row, new_row } => Record::Update { old_row, new_row },
79        }
80    }
81
82    /// Try mapping the row in the record to another row, returning error if any of the mapping fails.
83    pub fn try_map<R2, E>(self, f: impl Fn(R) -> Result<R2, E>) -> Result<Record<R2>, E> {
84        Ok(match self {
85            Record::Insert { new_row } => Record::Insert {
86                new_row: f(new_row)?,
87            },
88            Record::Delete { old_row } => Record::Delete {
89                old_row: f(old_row)?,
90            },
91            Record::Update { old_row, new_row } => Record::Update {
92                old_row: f(old_row)?,
93                new_row: f(new_row)?,
94            },
95        })
96    }
97
98    /// Map the row in the record to another row.
99    pub fn map<R2>(self, f: impl Fn(R) -> R2) -> Record<R2> {
100        let Ok(record) = self.try_map::<R2, Infallible>(|row| Ok(f(row)));
101        record
102    }
103}
104
105impl<R: Row> Record<R> {
106    /// Convert this stream record to a stream chunk containing only 1 or 2 rows.
107    pub fn to_stream_chunk(&self, data_types: &[DataType]) -> StreamChunk {
108        match self {
109            Record::Insert { new_row } => {
110                StreamChunk::from_rows(&[(Op::Insert, new_row)], data_types)
111            }
112            Record::Delete { old_row } => {
113                StreamChunk::from_rows(&[(Op::Delete, old_row)], data_types)
114            }
115            Record::Update { old_row, new_row } => StreamChunk::from_rows(
116                &[(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)],
117                data_types,
118            ),
119        }
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::row::OwnedRow;
127    use crate::test_prelude::StreamChunkTestExt;
128
129    #[test]
130    fn test_into_rows() {
131        let record = Record::Insert {
132            new_row: OwnedRow::new(vec![Some(1.into())]),
133        };
134        let rows: Vec<_> = record.into_rows().collect();
135        assert_eq!(rows.len(), 1);
136        assert_eq!(rows[0].0, Op::Insert);
137        assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
138
139        let record = Record::Delete {
140            old_row: OwnedRow::new(vec![Some(1.into())]),
141        };
142        let rows: Vec<_> = record.into_rows().collect();
143        assert_eq!(rows.len(), 1);
144        assert_eq!(rows[0].0, Op::Delete);
145        assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
146
147        let record = Record::Update {
148            old_row: OwnedRow::new(vec![Some(1.into())]),
149            new_row: OwnedRow::new(vec![Some(2.into())]),
150        };
151        let rows: Vec<_> = record.into_rows().collect();
152        assert_eq!(rows.len(), 2);
153        assert_eq!(rows[0].0, Op::UpdateDelete);
154        assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
155        assert_eq!(rows[1].0, Op::UpdateInsert);
156        assert_eq!(rows[1].1, OwnedRow::new(vec![Some(2.into())]));
157    }
158
159    #[test]
160    fn test_to_stream_chunk() {
161        let record = Record::Insert {
162            new_row: OwnedRow::new(vec![Some(1i64.into())]),
163        };
164        let chunk = record.to_stream_chunk(&[DataType::Int64]);
165        assert_eq!(
166            chunk,
167            StreamChunk::from_pretty(
168                " I
169                + 1"
170            )
171        );
172
173        let record = Record::Delete {
174            old_row: OwnedRow::new(vec![Some(1i64.into())]),
175        };
176        let chunk = record.to_stream_chunk(&[DataType::Int64]);
177        assert_eq!(
178            chunk,
179            StreamChunk::from_pretty(
180                " I
181                - 1"
182            )
183        );
184
185        let record = Record::Update {
186            old_row: OwnedRow::new(vec![Some(1i64.into())]),
187            new_row: OwnedRow::new(vec![Some(2i64.into())]),
188        };
189        let chunk = record.to_stream_chunk(&[DataType::Int64]);
190        assert_eq!(
191            chunk,
192            StreamChunk::from_pretty(
193                "  I
194                U- 1
195                U+ 2"
196            )
197        );
198    }
199}