risingwave_common/array/
data_chunk_iter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::Hash;
16use std::iter::{FusedIterator, TrustedLen};
17use std::ops::Range;
18
19use super::ArrayRef;
20use crate::array::DataChunk;
21use crate::row::Row;
22use crate::types::DatumRef;
23
24impl DataChunk {
25    /// Get an iterator for visible rows.
26    pub fn rows(&self) -> DataChunkRefIter<'_> {
27        self.rows_in(0..self.capacity())
28    }
29
30    /// Get an iterator for visible rows in range.
31    pub fn rows_in(&self, range: Range<usize>) -> DataChunkRefIter<'_> {
32        DataChunkRefIter {
33            chunk: self,
34            idx: range,
35        }
36    }
37
38    /// Get an iterator for all rows in the chunk, and a `None` represents an invisible row.
39    pub fn rows_with_holes(&self) -> DataChunkRefIterWithHoles<'_> {
40        DataChunkRefIterWithHoles {
41            chunk: self,
42            idx: 0,
43        }
44    }
45}
46
47pub struct DataChunkRefIter<'a> {
48    chunk: &'a DataChunk,
49    idx: Range<usize>,
50}
51
52impl<'a> Iterator for DataChunkRefIter<'a> {
53    type Item = RowRef<'a>;
54
55    fn next(&mut self) -> Option<Self::Item> {
56        if self.idx.start == self.idx.end {
57            return None;
58        }
59        match self.chunk.next_visible_row_idx(self.idx.start) {
60            Some(idx) if idx < self.idx.end => {
61                self.idx.start = idx + 1;
62                Some(RowRef::new(self.chunk, idx))
63            }
64            _ => {
65                self.idx.start = self.idx.end;
66                None
67            }
68        }
69    }
70
71    fn size_hint(&self) -> (usize, Option<usize>) {
72        if self.idx.start != self.idx.end {
73            (
74                // if all following rows are invisible
75                0,
76                // if all following rows are visible
77                Some(std::cmp::min(
78                    self.idx.end - self.idx.start,
79                    self.chunk.cardinality(),
80                )),
81            )
82        } else {
83            (0, Some(0))
84        }
85    }
86}
87
88impl FusedIterator for DataChunkRefIter<'_> {}
89
90pub struct DataChunkRefIterWithHoles<'a> {
91    chunk: &'a DataChunk,
92    idx: usize,
93}
94
95impl<'a> Iterator for DataChunkRefIterWithHoles<'a> {
96    type Item = Option<RowRef<'a>>;
97
98    fn next(&mut self) -> Option<Self::Item> {
99        let len = self.chunk.capacity();
100        let vis = self.chunk.visibility();
101        if self.idx == len {
102            None
103        } else {
104            let ret = Some(if !vis.is_set(self.idx) {
105                None
106            } else {
107                Some(RowRef::new(self.chunk, self.idx))
108            });
109            self.idx += 1;
110            ret
111        }
112    }
113
114    fn size_hint(&self) -> (usize, Option<usize>) {
115        let size = self.chunk.capacity() - self.idx;
116        (size, Some(size))
117    }
118}
119
120impl ExactSizeIterator for DataChunkRefIterWithHoles<'_> {}
121unsafe impl TrustedLen for DataChunkRefIterWithHoles<'_> {}
122
123// Deliberately making `RowRef` and `RowRefIter` defined in a private module to ensure
124// the checks in the constructors are always performed.
125mod row_ref {
126    use super::*;
127
128    #[derive(Clone, Copy)]
129    pub struct RowRef<'a> {
130        columns: &'a [ArrayRef],
131
132        idx: usize,
133    }
134
135    impl std::fmt::Debug for RowRef<'_> {
136        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137            f.debug_list().entries(self.iter()).finish()
138        }
139    }
140
141    impl<'a> RowRef<'a> {
142        pub fn new(chunk: &'a DataChunk, idx: usize) -> Self {
143            assert!(
144                idx < chunk.capacity(),
145                "index {idx} out of bound {}",
146                chunk.capacity()
147            );
148
149            Self {
150                columns: chunk.columns(),
151                idx,
152            }
153        }
154
155        pub fn with_columns(columns: &'a [ArrayRef], idx: usize) -> Self {
156            for column in columns {
157                assert!(
158                    idx < column.len(),
159                    "index {idx} out of bound {}",
160                    column.len()
161                );
162            }
163
164            Self { columns, idx }
165        }
166
167        /// Get the index of this row in the data chunk.
168        #[must_use]
169        pub fn index(&self) -> usize {
170            self.idx
171        }
172    }
173
174    impl PartialEq for RowRef<'_> {
175        fn eq(&self, other: &Self) -> bool {
176            self.iter().eq(other.iter())
177        }
178    }
179    impl Eq for RowRef<'_> {}
180
181    impl Hash for RowRef<'_> {
182        fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
183            self.hash_datums_into(state)
184        }
185    }
186
187    impl Row for RowRef<'_> {
188        fn datum_at(&self, index: usize) -> DatumRef<'_> {
189            // SAFETY: `self.idx` is already checked in `new` or `with_columns`.
190            unsafe { self.columns[index].value_at_unchecked(self.idx) }
191        }
192
193        unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
194            unsafe {
195                self.columns
196                    .get_unchecked(index)
197                    .value_at_unchecked(self.idx)
198            }
199        }
200
201        fn len(&self) -> usize {
202            self.columns.len()
203        }
204
205        fn iter(&self) -> impl ExactSizeIterator<Item = DatumRef<'_>> {
206            RowRefIter {
207                columns: self.columns.iter(),
208                row_idx: self.idx,
209            }
210        }
211    }
212
213    #[derive(Clone)]
214    pub struct RowRefIter<'a> {
215        columns: std::slice::Iter<'a, ArrayRef>,
216        row_idx: usize,
217    }
218
219    impl<'a> Iterator for RowRefIter<'a> {
220        type Item = DatumRef<'a>;
221
222        fn next(&mut self) -> Option<Self::Item> {
223            // SAFETY: `self.row_idx` is already checked in `new` or `with_columns` of `RowRef`.
224            unsafe {
225                self.columns
226                    .next()
227                    .map(|col| col.value_at_unchecked(self.row_idx))
228            }
229        }
230
231        fn size_hint(&self) -> (usize, Option<usize>) {
232            self.columns.size_hint()
233        }
234    }
235
236    impl ExactSizeIterator for RowRefIter<'_> {}
237    unsafe impl TrustedLen for RowRefIter<'_> {}
238}
239
240pub use row_ref::{RowRef, RowRefIter};
241
242#[cfg(test)]
243mod tests {
244    use std::collections::HashSet;
245
246    use crate::array::StreamChunk;
247    use crate::test_prelude::StreamChunkTestExt;
248
249    #[test]
250    fn test_row_ref_hash() {
251        let mut set = HashSet::new();
252        let chunk1 = StreamChunk::from_pretty(
253            " I I I
254            + 2 5 1
255            + 4 9 2
256            - 2 5 1",
257        );
258        for (_, row) in chunk1.rows() {
259            set.insert(row);
260        }
261        assert_eq!(set.len(), 2);
262
263        let chunk2 = StreamChunk::from_pretty(
264            " I I I
265            - 4 9 2",
266        );
267        for (_, row) in chunk2.rows() {
268            set.insert(row);
269        }
270        assert_eq!(set.len(), 2);
271
272        let chunk3 = StreamChunk::from_pretty(
273            " I I I
274            + 1 2 3",
275        );
276        for (_, row) in chunk3.rows() {
277            set.insert(row);
278        }
279        assert_eq!(set.len(), 3);
280    }
281}