risingwave_common/array/
data_chunk_iter.rs1use std::hash::Hash;
16use std::iter::{FusedIterator, TrustedLen};
17use std::ops::Range;
18
19use super::ArrayRef;
20use crate::array::DataChunk;
21use crate::row::Row;
22use crate::types::DatumRef;
23
24impl DataChunk {
25 pub fn rows(&self) -> DataChunkRefIter<'_> {
27 self.rows_in(0..self.capacity())
28 }
29
30 pub fn rows_in(&self, range: Range<usize>) -> DataChunkRefIter<'_> {
32 DataChunkRefIter {
33 chunk: self,
34 idx: range,
35 }
36 }
37
38 pub fn rows_with_holes(&self) -> DataChunkRefIterWithHoles<'_> {
40 DataChunkRefIterWithHoles {
41 chunk: self,
42 idx: 0,
43 }
44 }
45}
46
47pub struct DataChunkRefIter<'a> {
48 chunk: &'a DataChunk,
49 idx: Range<usize>,
50}
51
52impl<'a> Iterator for DataChunkRefIter<'a> {
53 type Item = RowRef<'a>;
54
55 fn next(&mut self) -> Option<Self::Item> {
56 if self.idx.start == self.idx.end {
57 return None;
58 }
59 match self.chunk.next_visible_row_idx(self.idx.start) {
60 Some(idx) if idx < self.idx.end => {
61 self.idx.start = idx + 1;
62 Some(RowRef::new(self.chunk, idx))
63 }
64 _ => {
65 self.idx.start = self.idx.end;
66 None
67 }
68 }
69 }
70
71 fn size_hint(&self) -> (usize, Option<usize>) {
72 if self.idx.start != self.idx.end {
73 (
74 0,
76 Some(std::cmp::min(
78 self.idx.end - self.idx.start,
79 self.chunk.cardinality(),
80 )),
81 )
82 } else {
83 (0, Some(0))
84 }
85 }
86}
87
88impl FusedIterator for DataChunkRefIter<'_> {}
89
90pub struct DataChunkRefIterWithHoles<'a> {
91 chunk: &'a DataChunk,
92 idx: usize,
93}
94
95impl<'a> Iterator for DataChunkRefIterWithHoles<'a> {
96 type Item = Option<RowRef<'a>>;
97
98 fn next(&mut self) -> Option<Self::Item> {
99 let len = self.chunk.capacity();
100 let vis = self.chunk.visibility();
101 if self.idx == len {
102 None
103 } else {
104 let ret = Some(if !vis.is_set(self.idx) {
105 None
106 } else {
107 Some(RowRef::new(self.chunk, self.idx))
108 });
109 self.idx += 1;
110 ret
111 }
112 }
113
114 fn size_hint(&self) -> (usize, Option<usize>) {
115 let size = self.chunk.capacity() - self.idx;
116 (size, Some(size))
117 }
118}
119
120impl ExactSizeIterator for DataChunkRefIterWithHoles<'_> {}
121unsafe impl TrustedLen for DataChunkRefIterWithHoles<'_> {}
122
123mod row_ref {
126 use super::*;
127
128 #[derive(Clone, Copy)]
129 pub struct RowRef<'a> {
130 columns: &'a [ArrayRef],
131
132 idx: usize,
133 }
134
135 impl std::fmt::Debug for RowRef<'_> {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 f.debug_list().entries(self.iter()).finish()
138 }
139 }
140
141 impl<'a> RowRef<'a> {
142 pub fn new(chunk: &'a DataChunk, idx: usize) -> Self {
143 assert!(
144 idx < chunk.capacity(),
145 "index {idx} out of bound {}",
146 chunk.capacity()
147 );
148
149 Self {
150 columns: chunk.columns(),
151 idx,
152 }
153 }
154
155 pub fn with_columns(columns: &'a [ArrayRef], idx: usize) -> Self {
156 for column in columns {
157 assert!(
158 idx < column.len(),
159 "index {idx} out of bound {}",
160 column.len()
161 );
162 }
163
164 Self { columns, idx }
165 }
166
167 #[must_use]
169 pub fn index(&self) -> usize {
170 self.idx
171 }
172 }
173
174 impl PartialEq for RowRef<'_> {
175 fn eq(&self, other: &Self) -> bool {
176 self.iter().eq(other.iter())
177 }
178 }
179 impl Eq for RowRef<'_> {}
180
181 impl Hash for RowRef<'_> {
182 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
183 self.hash_datums_into(state)
184 }
185 }
186
187 impl Row for RowRef<'_> {
188 fn datum_at(&self, index: usize) -> DatumRef<'_> {
189 unsafe { self.columns[index].value_at_unchecked(self.idx) }
191 }
192
193 unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
194 unsafe {
195 self.columns
196 .get_unchecked(index)
197 .value_at_unchecked(self.idx)
198 }
199 }
200
201 fn len(&self) -> usize {
202 self.columns.len()
203 }
204
205 fn iter(&self) -> impl ExactSizeIterator<Item = DatumRef<'_>> {
206 RowRefIter {
207 columns: self.columns.iter(),
208 row_idx: self.idx,
209 }
210 }
211 }
212
213 #[derive(Clone)]
214 pub struct RowRefIter<'a> {
215 columns: std::slice::Iter<'a, ArrayRef>,
216 row_idx: usize,
217 }
218
219 impl<'a> Iterator for RowRefIter<'a> {
220 type Item = DatumRef<'a>;
221
222 fn next(&mut self) -> Option<Self::Item> {
223 unsafe {
225 self.columns
226 .next()
227 .map(|col| col.value_at_unchecked(self.row_idx))
228 }
229 }
230
231 fn size_hint(&self) -> (usize, Option<usize>) {
232 self.columns.size_hint()
233 }
234 }
235
236 impl ExactSizeIterator for RowRefIter<'_> {}
237 unsafe impl TrustedLen for RowRefIter<'_> {}
238}
239
240pub use row_ref::{RowRef, RowRefIter};
241
242#[cfg(test)]
243mod tests {
244 use std::collections::HashSet;
245
246 use crate::array::StreamChunk;
247 use crate::test_prelude::StreamChunkTestExt;
248
249 #[test]
250 fn test_row_ref_hash() {
251 let mut set = HashSet::new();
252 let chunk1 = StreamChunk::from_pretty(
253 " I I I
254 + 2 5 1
255 + 4 9 2
256 - 2 5 1",
257 );
258 for (_, row) in chunk1.rows() {
259 set.insert(row);
260 }
261 assert_eq!(set.len(), 2);
262
263 let chunk2 = StreamChunk::from_pretty(
264 " I I I
265 - 4 9 2",
266 );
267 for (_, row) in chunk2.rows() {
268 set.insert(row);
269 }
270 assert_eq!(set.len(), 2);
271
272 let chunk3 = StreamChunk::from_pretty(
273 " I I I
274 + 1 2 3",
275 );
276 for (_, row) in chunk3.rows() {
277 set.insert(row);
278 }
279 assert_eq!(set.len(), 3);
280 }
281}