risingwave_common/array/
stream_record.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use auto_enums::auto_enum;
16
17use super::StreamChunk;
18use crate::array::Op;
19use crate::row::Row;
20use crate::types::DataType;
21
22/// Type of a row change, without row data.
23#[derive(Debug, Copy, Clone)]
24pub enum RecordType {
25    Insert,
26    Delete,
27    Update,
28}
29
30impl RecordType {
31    /// Get the corresponding `Op`s for this record type.
32    pub fn ops(self) -> &'static [Op] {
33        match self {
34            RecordType::Insert => &[Op::Insert],
35            RecordType::Delete => &[Op::Delete],
36            RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert],
37        }
38    }
39}
40
41/// Generic type to represent a row change.
42#[derive(Debug, Clone, Copy)]
43pub enum Record<R: Row> {
44    Insert { new_row: R },
45    Delete { old_row: R },
46    Update { old_row: R, new_row: R },
47}
48
49impl<R: Row> Record<R> {
50    /// Convert this stream record to one or two rows with corresponding ops.
51    #[auto_enum(Iterator)]
52    pub fn into_rows(self) -> impl Iterator<Item = (Op, R)> {
53        match self {
54            Record::Insert { new_row } => std::iter::once((Op::Insert, new_row)),
55            Record::Delete { old_row } => std::iter::once((Op::Delete, old_row)),
56            Record::Update { old_row, new_row } => {
57                [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)].into_iter()
58            }
59        }
60    }
61
62    /// Get record type of this record.
63    pub fn to_record_type(&self) -> RecordType {
64        match self {
65            Record::Insert { .. } => RecordType::Insert,
66            Record::Delete { .. } => RecordType::Delete,
67            Record::Update { .. } => RecordType::Update,
68        }
69    }
70
71    /// Convert this stream record to a stream chunk containing only 1 or 2 rows.
72    pub fn to_stream_chunk(&self, data_types: &[DataType]) -> StreamChunk {
73        match self {
74            Record::Insert { new_row } => {
75                StreamChunk::from_rows(&[(Op::Insert, new_row)], data_types)
76            }
77            Record::Delete { old_row } => {
78                StreamChunk::from_rows(&[(Op::Delete, old_row)], data_types)
79            }
80            Record::Update { old_row, new_row } => StreamChunk::from_rows(
81                &[(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)],
82                data_types,
83            ),
84        }
85    }
86
87    /// Convert from `&Record<R>` to `Record<&R>`.
88    pub fn as_ref(&self) -> Record<&R> {
89        match self {
90            Record::Insert { new_row } => Record::Insert { new_row },
91            Record::Delete { old_row } => Record::Delete { old_row },
92            Record::Update { old_row, new_row } => Record::Update { old_row, new_row },
93        }
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100    use crate::row::OwnedRow;
101    use crate::test_prelude::StreamChunkTestExt;
102
103    #[test]
104    fn test_into_rows() {
105        let record = Record::Insert {
106            new_row: OwnedRow::new(vec![Some(1.into())]),
107        };
108        let rows: Vec<_> = record.into_rows().collect();
109        assert_eq!(rows.len(), 1);
110        assert_eq!(rows[0].0, Op::Insert);
111        assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
112
113        let record = Record::Delete {
114            old_row: OwnedRow::new(vec![Some(1.into())]),
115        };
116        let rows: Vec<_> = record.into_rows().collect();
117        assert_eq!(rows.len(), 1);
118        assert_eq!(rows[0].0, Op::Delete);
119        assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
120
121        let record = Record::Update {
122            old_row: OwnedRow::new(vec![Some(1.into())]),
123            new_row: OwnedRow::new(vec![Some(2.into())]),
124        };
125        let rows: Vec<_> = record.into_rows().collect();
126        assert_eq!(rows.len(), 2);
127        assert_eq!(rows[0].0, Op::UpdateDelete);
128        assert_eq!(rows[0].1, OwnedRow::new(vec![Some(1.into())]));
129        assert_eq!(rows[1].0, Op::UpdateInsert);
130        assert_eq!(rows[1].1, OwnedRow::new(vec![Some(2.into())]));
131    }
132
133    #[test]
134    fn test_to_stream_chunk() {
135        let record = Record::Insert {
136            new_row: OwnedRow::new(vec![Some(1i64.into())]),
137        };
138        let chunk = record.to_stream_chunk(&[DataType::Int64]);
139        assert_eq!(
140            chunk,
141            StreamChunk::from_pretty(
142                " I
143                + 1"
144            )
145        );
146
147        let record = Record::Delete {
148            old_row: OwnedRow::new(vec![Some(1i64.into())]),
149        };
150        let chunk = record.to_stream_chunk(&[DataType::Int64]);
151        assert_eq!(
152            chunk,
153            StreamChunk::from_pretty(
154                " I
155                - 1"
156            )
157        );
158
159        let record = Record::Update {
160            old_row: OwnedRow::new(vec![Some(1i64.into())]),
161            new_row: OwnedRow::new(vec![Some(2i64.into())]),
162        };
163        let chunk = record.to_stream_chunk(&[DataType::Int64]);
164        assert_eq!(
165            chunk,
166            StreamChunk::from_pretty(
167                "  I
168                U- 1
169                U+ 2"
170            )
171        );
172    }
173}