risingwave_common/array/
stream_chunk_iter.rs1use std::ops::Range;
16
17use super::RowRef;
18use super::data_chunk_iter::DataChunkRefIter;
19use super::stream_record::Record;
20use crate::array::{Op, StreamChunk};
21
22impl StreamChunk {
23 pub fn records(&self) -> StreamChunkRefIter<'_> {
25 StreamChunkRefIter {
26 chunk: self,
27 inner: self.data_chunk().rows(),
28 }
29 }
30
31 pub fn rows(&self) -> impl Iterator<Item = (Op, RowRef<'_>)> {
35 self.rows_in(0..self.capacity())
36 }
37
38 pub fn rows_in(&self, range: Range<usize>) -> impl Iterator<Item = (Op, RowRef<'_>)> {
40 self.data_chunk().rows_in(range).map(|row| {
41 (
42 unsafe { *self.ops().get_unchecked(row.index()) },
44 row,
45 )
46 })
47 }
48
49 pub fn row_at(&self, pos: usize) -> (Op, RowRef<'_>, bool) {
51 let op = self.ops()[pos];
52 let (row, visible) = self.data_chunk().row_at(pos);
53 (op, row, visible)
54 }
55
56 pub fn rows_with_holes(&self) -> impl ExactSizeIterator<Item = Option<(Op, RowRef<'_>)>> {
57 self.data_chunk().rows_with_holes().map(|row| {
58 row.map(|row| {
59 (
60 unsafe { *self.ops().get_unchecked(row.index()) },
62 row,
63 )
64 })
65 })
66 }
67}
68
69pub struct StreamChunkRefIter<'a> {
70 chunk: &'a StreamChunk,
71
72 inner: DataChunkRefIter<'a>,
73}
74
75impl<'a> Iterator for StreamChunkRefIter<'a> {
76 type Item = Record<RowRef<'a>>;
77
78 fn next(&mut self) -> Option<Self::Item> {
79 let row = self.inner.next()?;
80 let op = unsafe { self.chunk.ops().get_unchecked(row.index()) };
82
83 match op {
84 Op::Insert => Some(Record::Insert { new_row: row }),
85 Op::Delete => Some(Record::Delete { old_row: row }),
86 Op::UpdateDelete => {
87 let insert_row = self.inner.next().expect("expect a row after U-");
88 let op = unsafe { *self.chunk.ops().get_unchecked(insert_row.index()) };
90 debug_assert_eq!(op, Op::UpdateInsert, "expect a U+ after U-");
91
92 Some(Record::Update {
93 old_row: row,
94 new_row: insert_row,
95 })
96 }
97 Op::UpdateInsert => panic!("expect a U- before U+"),
98 }
99 }
100
101 fn size_hint(&self) -> (usize, Option<usize>) {
102 let (lower, upper) = self.inner.size_hint();
103 (lower / 2, upper)
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 extern crate test;
110 use itertools::Itertools;
111 use test::Bencher;
112
113 use crate::array::stream_record::Record;
114 use crate::row::Row;
115 use crate::test_utils::test_stream_chunk::{
116 BigStreamChunk, TestStreamChunk, WhatEverStreamChunk,
117 };
118
119 #[test]
120 fn test_chunk_rows() {
121 let test = WhatEverStreamChunk;
122 let chunk = test.stream_chunk();
123 let mut rows = chunk.rows().map(|(op, row)| (op, row.into_owned_row()));
124 assert_eq!(Some(test.row_with_op_at(0)), rows.next());
125 assert_eq!(Some(test.row_with_op_at(1)), rows.next());
126 assert_eq!(Some(test.row_with_op_at(2)), rows.next());
127 assert_eq!(Some(test.row_with_op_at(3)), rows.next());
128 }
129
130 #[test]
131 fn test_chunk_records() {
132 let test = WhatEverStreamChunk;
133 let chunk = test.stream_chunk();
134 let mut rows = chunk
135 .records()
136 .flat_map(Record::into_rows)
137 .map(|(op, row)| (op, row.into_owned_row()));
138 assert_eq!(Some(test.row_with_op_at(0)), rows.next());
139 assert_eq!(Some(test.row_with_op_at(1)), rows.next());
140 assert_eq!(Some(test.row_with_op_at(2)), rows.next());
141 assert_eq!(Some(test.row_with_op_at(3)), rows.next());
142 }
143
144 #[bench]
145 fn bench_rows_iterator_from_records(b: &mut Bencher) {
146 let chunk = BigStreamChunk::new(10000).stream_chunk();
147 b.iter(|| {
148 for (_op, row) in chunk.records().flat_map(Record::into_rows) {
149 test::black_box(row.iter().count());
150 }
151 })
152 }
153
154 #[bench]
155 fn bench_rows_iterator(b: &mut Bencher) {
156 let chunk = BigStreamChunk::new(10000).stream_chunk();
157 b.iter(|| {
158 for (_op, row) in chunk.rows() {
159 test::black_box(row.iter().count());
160 }
161 })
162 }
163
164 #[bench]
165 fn bench_rows_iterator_vec_of_datum_refs(b: &mut Bencher) {
166 let chunk = BigStreamChunk::new(10000).stream_chunk();
167 b.iter(|| {
168 for (_op, row) in chunk.rows() {
169 let row = row.iter().collect_vec();
171 test::black_box(row);
172 }
173 })
174 }
175
176 #[bench]
177 fn bench_record_iterator(b: &mut Bencher) {
178 let chunk = BigStreamChunk::new(10000).stream_chunk();
179 b.iter(|| {
180 for record in chunk.records() {
181 match record {
182 Record::Insert { new_row } => test::black_box(new_row.iter().count()),
183 Record::Delete { old_row } => test::black_box(old_row.iter().count()),
184 Record::Update { old_row, new_row } => {
185 test::black_box(old_row.iter().count());
186 test::black_box(new_row.iter().count())
187 }
188 };
189 }
190 })
191 }
192}