risingwave_common/array/
stream_chunk_iter.rs1use std::ops::Range;
16
17use super::RowRef;
18use super::data_chunk_iter::DataChunkRefIter;
19use super::stream_record::Record;
20use crate::array::{Op, StreamChunk};
21use crate::row::RowExt;
22
23impl StreamChunk {
24 pub fn records(&self) -> StreamChunkRefIter<'_> {
26 StreamChunkRefIter {
27 chunk: self,
28 inner: self.data_chunk().rows(),
29 }
30 }
31
32 pub fn rows(&self) -> impl Iterator<Item = (Op, RowRef<'_>)> {
36 self.rows_in(0..self.capacity())
37 }
38
39 pub fn rows_in(&self, range: Range<usize>) -> impl Iterator<Item = (Op, RowRef<'_>)> {
41 self.data_chunk().rows_in(range).map(|row| {
42 (
43 unsafe { *self.ops().get_unchecked(row.index()) },
45 row,
46 )
47 })
48 }
49
50 pub fn row_at(&self, pos: usize) -> (Op, RowRef<'_>, bool) {
52 let op = self.ops()[pos];
53 let (row, visible) = self.data_chunk().row_at(pos);
54 (op, row, visible)
55 }
56
57 pub fn rows_with_holes(&self) -> impl ExactSizeIterator<Item = Option<(Op, RowRef<'_>)>> {
58 self.data_chunk().rows_with_holes().map(|row| {
59 row.map(|row| {
60 (
61 unsafe { *self.ops().get_unchecked(row.index()) },
63 row,
64 )
65 })
66 })
67 }
68}
69
70pub struct StreamChunkRefIter<'a> {
71 chunk: &'a StreamChunk,
72
73 inner: DataChunkRefIter<'a>,
74}
75
76impl<'a> Iterator for StreamChunkRefIter<'a> {
77 type Item = Record<RowRef<'a>>;
78
79 fn next(&mut self) -> Option<Self::Item> {
80 let row = self.inner.next()?;
81 let op = unsafe { self.chunk.ops().get_unchecked(row.index()) };
83
84 match op {
85 Op::Insert => Some(Record::Insert { new_row: row }),
86 Op::Delete => Some(Record::Delete { old_row: row }),
87 Op::UpdateDelete => {
88 let next_row = self
89 .inner
90 .next()
91 .unwrap_or_else(|| panic!("expect a row after U-\nU- row: {}", row.display()));
92 let op = unsafe { *self.chunk.ops().get_unchecked(next_row.index()) };
94 debug_assert_eq!(
95 op,
96 Op::UpdateInsert,
97 "expect a U+ after U-\nU- row: {}\nrow after U-: {}",
98 row.display(),
99 next_row.display()
100 );
101
102 Some(Record::Update {
103 old_row: row,
104 new_row: next_row,
105 })
106 }
107 Op::UpdateInsert => panic!("expect a U- before U+\nU+ row: {}", row.display()),
108 }
109 }
110
111 fn size_hint(&self) -> (usize, Option<usize>) {
112 let (lower, upper) = self.inner.size_hint();
113 (lower / 2, upper)
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 extern crate test;
120 use itertools::Itertools;
121 use test::Bencher;
122
123 use crate::array::stream_record::Record;
124 use crate::row::Row;
125 use crate::test_utils::test_stream_chunk::{
126 BigStreamChunk, TestStreamChunk, WhatEverStreamChunk,
127 };
128
129 #[test]
130 fn test_chunk_rows() {
131 let test = WhatEverStreamChunk;
132 let chunk = test.stream_chunk();
133 let mut rows = chunk.rows().map(|(op, row)| (op, row.into_owned_row()));
134 assert_eq!(Some(test.row_with_op_at(0)), rows.next());
135 assert_eq!(Some(test.row_with_op_at(1)), rows.next());
136 assert_eq!(Some(test.row_with_op_at(2)), rows.next());
137 assert_eq!(Some(test.row_with_op_at(3)), rows.next());
138 }
139
140 #[test]
141 fn test_chunk_records() {
142 let test = WhatEverStreamChunk;
143 let chunk = test.stream_chunk();
144 let mut rows = chunk
145 .records()
146 .flat_map(Record::into_rows)
147 .map(|(op, row)| (op, row.into_owned_row()));
148 assert_eq!(Some(test.row_with_op_at(0)), rows.next());
149 assert_eq!(Some(test.row_with_op_at(1)), rows.next());
150 assert_eq!(Some(test.row_with_op_at(2)), rows.next());
151 assert_eq!(Some(test.row_with_op_at(3)), rows.next());
152 }
153
154 #[bench]
155 fn bench_rows_iterator_from_records(b: &mut Bencher) {
156 let chunk = BigStreamChunk::new(10000).stream_chunk();
157 b.iter(|| {
158 for (_op, row) in chunk.records().flat_map(Record::into_rows) {
159 test::black_box(row.iter().count());
160 }
161 })
162 }
163
164 #[bench]
165 fn bench_rows_iterator(b: &mut Bencher) {
166 let chunk = BigStreamChunk::new(10000).stream_chunk();
167 b.iter(|| {
168 for (_op, row) in chunk.rows() {
169 test::black_box(row.iter().count());
170 }
171 })
172 }
173
174 #[bench]
175 fn bench_rows_iterator_vec_of_datum_refs(b: &mut Bencher) {
176 let chunk = BigStreamChunk::new(10000).stream_chunk();
177 b.iter(|| {
178 for (_op, row) in chunk.rows() {
179 let row = row.iter().collect_vec();
181 test::black_box(row);
182 }
183 })
184 }
185
186 #[bench]
187 fn bench_record_iterator(b: &mut Bencher) {
188 let chunk = BigStreamChunk::new(10000).stream_chunk();
189 b.iter(|| {
190 for record in chunk.records() {
191 match record {
192 Record::Insert { new_row } => test::black_box(new_row.iter().count()),
193 Record::Delete { old_row } => test::black_box(old_row.iter().count()),
194 Record::Update { old_row, new_row } => {
195 test::black_box(old_row.iter().count());
196 test::black_box(new_row.iter().count())
197 }
198 };
199 }
200 })
201 }
202}