risingwave_common/array/
stream_chunk_iter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16
17use super::RowRef;
18use super::data_chunk_iter::DataChunkRefIter;
19use super::stream_record::Record;
20use crate::array::{Op, StreamChunk};
21
22impl StreamChunk {
23    /// Return an iterator on stream records of this stream chunk.
24    pub fn records(&self) -> StreamChunkRefIter<'_> {
25        StreamChunkRefIter {
26            chunk: self,
27            inner: self.data_chunk().rows(),
28        }
29    }
30
31    /// Return an iterator on rows of this stream chunk.
32    ///
33    /// Should consider using [`StreamChunk::records`] if possible.
34    pub fn rows(&self) -> impl Iterator<Item = (Op, RowRef<'_>)> {
35        self.rows_in(0..self.capacity())
36    }
37
38    /// Return an iterator on rows of this stream chunk in a range.
39    pub fn rows_in(&self, range: Range<usize>) -> impl Iterator<Item = (Op, RowRef<'_>)> {
40        self.data_chunk().rows_in(range).map(|row| {
41            (
42                // SAFETY: index is checked since we are in the iterator.
43                unsafe { *self.ops().get_unchecked(row.index()) },
44                row,
45            )
46        })
47    }
48
49    /// Random access a row at `pos`. Return the op, data and whether the row is visible.
50    pub fn row_at(&self, pos: usize) -> (Op, RowRef<'_>, bool) {
51        let op = self.ops()[pos];
52        let (row, visible) = self.data_chunk().row_at(pos);
53        (op, row, visible)
54    }
55
56    pub fn rows_with_holes(&self) -> impl ExactSizeIterator<Item = Option<(Op, RowRef<'_>)>> {
57        self.data_chunk().rows_with_holes().map(|row| {
58            row.map(|row| {
59                (
60                    // SAFETY: index is checked since we are in the iterator.
61                    unsafe { *self.ops().get_unchecked(row.index()) },
62                    row,
63                )
64            })
65        })
66    }
67}
68
69pub struct StreamChunkRefIter<'a> {
70    chunk: &'a StreamChunk,
71
72    inner: DataChunkRefIter<'a>,
73}
74
75impl<'a> Iterator for StreamChunkRefIter<'a> {
76    type Item = Record<RowRef<'a>>;
77
78    fn next(&mut self) -> Option<Self::Item> {
79        let row = self.inner.next()?;
80        // SAFETY: index is checked since `row` is `Some`.
81        let op = unsafe { self.chunk.ops().get_unchecked(row.index()) };
82
83        match op {
84            Op::Insert => Some(Record::Insert { new_row: row }),
85            Op::Delete => Some(Record::Delete { old_row: row }),
86            Op::UpdateDelete => {
87                let insert_row = self.inner.next().expect("expect a row after U-");
88                // SAFETY: index is checked since `insert_row` is `Some`.
89                let op = unsafe { *self.chunk.ops().get_unchecked(insert_row.index()) };
90                debug_assert_eq!(op, Op::UpdateInsert, "expect a U+ after U-");
91
92                Some(Record::Update {
93                    old_row: row,
94                    new_row: insert_row,
95                })
96            }
97            Op::UpdateInsert => panic!("expect a U- before U+"),
98        }
99    }
100
101    fn size_hint(&self) -> (usize, Option<usize>) {
102        let (lower, upper) = self.inner.size_hint();
103        (lower / 2, upper)
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    extern crate test;
110    use itertools::Itertools;
111    use test::Bencher;
112
113    use crate::array::stream_record::Record;
114    use crate::row::Row;
115    use crate::test_utils::test_stream_chunk::{
116        BigStreamChunk, TestStreamChunk, WhatEverStreamChunk,
117    };
118
119    #[test]
120    fn test_chunk_rows() {
121        let test = WhatEverStreamChunk;
122        let chunk = test.stream_chunk();
123        let mut rows = chunk.rows().map(|(op, row)| (op, row.into_owned_row()));
124        assert_eq!(Some(test.row_with_op_at(0)), rows.next());
125        assert_eq!(Some(test.row_with_op_at(1)), rows.next());
126        assert_eq!(Some(test.row_with_op_at(2)), rows.next());
127        assert_eq!(Some(test.row_with_op_at(3)), rows.next());
128    }
129
130    #[test]
131    fn test_chunk_records() {
132        let test = WhatEverStreamChunk;
133        let chunk = test.stream_chunk();
134        let mut rows = chunk
135            .records()
136            .flat_map(Record::into_rows)
137            .map(|(op, row)| (op, row.into_owned_row()));
138        assert_eq!(Some(test.row_with_op_at(0)), rows.next());
139        assert_eq!(Some(test.row_with_op_at(1)), rows.next());
140        assert_eq!(Some(test.row_with_op_at(2)), rows.next());
141        assert_eq!(Some(test.row_with_op_at(3)), rows.next());
142    }
143
144    #[bench]
145    fn bench_rows_iterator_from_records(b: &mut Bencher) {
146        let chunk = BigStreamChunk::new(10000).stream_chunk();
147        b.iter(|| {
148            for (_op, row) in chunk.records().flat_map(Record::into_rows) {
149                test::black_box(row.iter().count());
150            }
151        })
152    }
153
154    #[bench]
155    fn bench_rows_iterator(b: &mut Bencher) {
156        let chunk = BigStreamChunk::new(10000).stream_chunk();
157        b.iter(|| {
158            for (_op, row) in chunk.rows() {
159                test::black_box(row.iter().count());
160            }
161        })
162    }
163
164    #[bench]
165    fn bench_rows_iterator_vec_of_datum_refs(b: &mut Bencher) {
166        let chunk = BigStreamChunk::new(10000).stream_chunk();
167        b.iter(|| {
168            for (_op, row) in chunk.rows() {
169                // Mimic the old `RowRef(Vec<DatumRef>)`
170                let row = row.iter().collect_vec();
171                test::black_box(row);
172            }
173        })
174    }
175
176    #[bench]
177    fn bench_record_iterator(b: &mut Bencher) {
178        let chunk = BigStreamChunk::new(10000).stream_chunk();
179        b.iter(|| {
180            for record in chunk.records() {
181                match record {
182                    Record::Insert { new_row } => test::black_box(new_row.iter().count()),
183                    Record::Delete { old_row } => test::black_box(old_row.iter().count()),
184                    Record::Update { old_row, new_row } => {
185                        test::black_box(old_row.iter().count());
186                        test::black_box(new_row.iter().count())
187                    }
188                };
189            }
190        })
191    }
192}