risingwave_common/array/
stream_chunk_iter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16
17use super::RowRef;
18use super::data_chunk_iter::DataChunkRefIter;
19use super::stream_record::Record;
20use crate::array::{Op, StreamChunk};
21use crate::row::RowExt;
22
23impl StreamChunk {
24    /// Return an iterator on stream records of this stream chunk.
25    pub fn records(&self) -> StreamChunkRefIter<'_> {
26        StreamChunkRefIter {
27            chunk: self,
28            inner: self.data_chunk().rows(),
29        }
30    }
31
32    /// Return an iterator on rows of this stream chunk.
33    ///
34    /// Should consider using [`StreamChunk::records`] if possible.
35    pub fn rows(&self) -> impl Iterator<Item = (Op, RowRef<'_>)> {
36        self.rows_in(0..self.capacity())
37    }
38
39    /// Return an iterator on rows of this stream chunk in a range.
40    pub fn rows_in(&self, range: Range<usize>) -> impl Iterator<Item = (Op, RowRef<'_>)> {
41        self.data_chunk().rows_in(range).map(|row| {
42            (
43                // SAFETY: index is checked since we are in the iterator.
44                unsafe { *self.ops().get_unchecked(row.index()) },
45                row,
46            )
47        })
48    }
49
50    /// Random access a row at `pos`. Return the op, data and whether the row is visible.
51    pub fn row_at(&self, pos: usize) -> (Op, RowRef<'_>, bool) {
52        let op = self.ops()[pos];
53        let (row, visible) = self.data_chunk().row_at(pos);
54        (op, row, visible)
55    }
56
57    pub fn rows_with_holes(&self) -> impl ExactSizeIterator<Item = Option<(Op, RowRef<'_>)>> {
58        self.data_chunk().rows_with_holes().map(|row| {
59            row.map(|row| {
60                (
61                    // SAFETY: index is checked since we are in the iterator.
62                    unsafe { *self.ops().get_unchecked(row.index()) },
63                    row,
64                )
65            })
66        })
67    }
68}
69
70pub struct StreamChunkRefIter<'a> {
71    chunk: &'a StreamChunk,
72
73    inner: DataChunkRefIter<'a>,
74}
75
76impl<'a> Iterator for StreamChunkRefIter<'a> {
77    type Item = Record<RowRef<'a>>;
78
79    fn next(&mut self) -> Option<Self::Item> {
80        let row = self.inner.next()?;
81        // SAFETY: index is checked since `row` is `Some`.
82        let op = unsafe { self.chunk.ops().get_unchecked(row.index()) };
83
84        match op {
85            Op::Insert => Some(Record::Insert { new_row: row }),
86            Op::Delete => Some(Record::Delete { old_row: row }),
87            Op::UpdateDelete => {
88                let next_row = self
89                    .inner
90                    .next()
91                    .unwrap_or_else(|| panic!("expect a row after U-\nU- row: {}", row.display()));
92                // SAFETY: index is checked since `insert_row` is `Some`.
93                let op = unsafe { *self.chunk.ops().get_unchecked(next_row.index()) };
94                debug_assert_eq!(
95                    op,
96                    Op::UpdateInsert,
97                    "expect a U+ after U-\nU- row: {}\nrow after U-: {}",
98                    row.display(),
99                    next_row.display()
100                );
101
102                Some(Record::Update {
103                    old_row: row,
104                    new_row: next_row,
105                })
106            }
107            Op::UpdateInsert => panic!("expect a U- before U+\nU+ row: {}", row.display()),
108        }
109    }
110
111    fn size_hint(&self) -> (usize, Option<usize>) {
112        let (lower, upper) = self.inner.size_hint();
113        (lower / 2, upper)
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    extern crate test;
120    use itertools::Itertools;
121    use test::Bencher;
122
123    use crate::array::stream_record::Record;
124    use crate::row::Row;
125    use crate::test_utils::test_stream_chunk::{
126        BigStreamChunk, TestStreamChunk, WhatEverStreamChunk,
127    };
128
129    #[test]
130    fn test_chunk_rows() {
131        let test = WhatEverStreamChunk;
132        let chunk = test.stream_chunk();
133        let mut rows = chunk.rows().map(|(op, row)| (op, row.into_owned_row()));
134        assert_eq!(Some(test.row_with_op_at(0)), rows.next());
135        assert_eq!(Some(test.row_with_op_at(1)), rows.next());
136        assert_eq!(Some(test.row_with_op_at(2)), rows.next());
137        assert_eq!(Some(test.row_with_op_at(3)), rows.next());
138    }
139
140    #[test]
141    fn test_chunk_records() {
142        let test = WhatEverStreamChunk;
143        let chunk = test.stream_chunk();
144        let mut rows = chunk
145            .records()
146            .flat_map(Record::into_rows)
147            .map(|(op, row)| (op, row.into_owned_row()));
148        assert_eq!(Some(test.row_with_op_at(0)), rows.next());
149        assert_eq!(Some(test.row_with_op_at(1)), rows.next());
150        assert_eq!(Some(test.row_with_op_at(2)), rows.next());
151        assert_eq!(Some(test.row_with_op_at(3)), rows.next());
152    }
153
154    #[bench]
155    fn bench_rows_iterator_from_records(b: &mut Bencher) {
156        let chunk = BigStreamChunk::new(10000).stream_chunk();
157        b.iter(|| {
158            for (_op, row) in chunk.records().flat_map(Record::into_rows) {
159                test::black_box(row.iter().count());
160            }
161        })
162    }
163
164    #[bench]
165    fn bench_rows_iterator(b: &mut Bencher) {
166        let chunk = BigStreamChunk::new(10000).stream_chunk();
167        b.iter(|| {
168            for (_op, row) in chunk.rows() {
169                test::black_box(row.iter().count());
170            }
171        })
172    }
173
174    #[bench]
175    fn bench_rows_iterator_vec_of_datum_refs(b: &mut Bencher) {
176        let chunk = BigStreamChunk::new(10000).stream_chunk();
177        b.iter(|| {
178            for (_op, row) in chunk.rows() {
179                // Mimic the old `RowRef(Vec<DatumRef>)`
180                let row = row.iter().collect_vec();
181                test::black_box(row);
182            }
183        })
184    }
185
186    #[bench]
187    fn bench_record_iterator(b: &mut Bencher) {
188        let chunk = BigStreamChunk::new(10000).stream_chunk();
189        b.iter(|| {
190            for record in chunk.records() {
191                match record {
192                    Record::Insert { new_row } => test::black_box(new_row.iter().count()),
193                    Record::Delete { old_row } => test::black_box(old_row.iter().count()),
194                    Record::Update { old_row, new_row } => {
195                        test::black_box(old_row.iter().count());
196                        test::black_box(new_row.iter().count())
197                    }
198                };
199            }
200        })
201    }
202}