risingwave_common/array/
stream_chunk_iter.rsuse std::ops::Range;
use super::data_chunk_iter::DataChunkRefIter;
use super::stream_record::Record;
use super::RowRef;
use crate::array::{Op, StreamChunk};
impl StreamChunk {
pub fn records(&self) -> StreamChunkRefIter<'_> {
StreamChunkRefIter {
chunk: self,
inner: self.data_chunk().rows(),
}
}
pub fn rows(&self) -> impl Iterator<Item = (Op, RowRef<'_>)> {
self.rows_in(0..self.capacity())
}
pub fn rows_in(&self, range: Range<usize>) -> impl Iterator<Item = (Op, RowRef<'_>)> {
self.data_chunk().rows_in(range).map(|row| {
(
unsafe { *self.ops().get_unchecked(row.index()) },
row,
)
})
}
pub fn row_at(&self, pos: usize) -> (Op, RowRef<'_>, bool) {
let op = self.ops()[pos];
let (row, visible) = self.data_chunk().row_at(pos);
(op, row, visible)
}
pub fn rows_with_holes(&self) -> impl Iterator<Item = Option<(Op, RowRef<'_>)>> {
self.data_chunk().rows_with_holes().map(|row| {
row.map(|row| {
(
unsafe { *self.ops().get_unchecked(row.index()) },
row,
)
})
})
}
}
pub struct StreamChunkRefIter<'a> {
chunk: &'a StreamChunk,
inner: DataChunkRefIter<'a>,
}
impl<'a> Iterator for StreamChunkRefIter<'a> {
type Item = Record<RowRef<'a>>;
fn next(&mut self) -> Option<Self::Item> {
let row = self.inner.next()?;
let op = unsafe { self.chunk.ops().get_unchecked(row.index()) };
match op {
Op::Insert => Some(Record::Insert { new_row: row }),
Op::Delete => Some(Record::Delete { old_row: row }),
Op::UpdateDelete => {
let insert_row = self.inner.next().expect("expect a row after U-");
let op = unsafe { *self.chunk.ops().get_unchecked(insert_row.index()) };
debug_assert_eq!(op, Op::UpdateInsert, "expect a U+ after U-");
Some(Record::Update {
old_row: row,
new_row: insert_row,
})
}
Op::UpdateInsert => panic!("expect a U- before U+"),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) = self.inner.size_hint();
(lower / 2, upper)
}
}
#[cfg(test)]
mod tests {
extern crate test;
use itertools::Itertools;
use test::Bencher;
use crate::array::stream_record::Record;
use crate::row::Row;
use crate::test_utils::test_stream_chunk::{
BigStreamChunk, TestStreamChunk, WhatEverStreamChunk,
};
#[test]
fn test_chunk_rows() {
let test = WhatEverStreamChunk;
let chunk = test.stream_chunk();
let mut rows = chunk.rows().map(|(op, row)| (op, row.into_owned_row()));
assert_eq!(Some(test.row_with_op_at(0)), rows.next());
assert_eq!(Some(test.row_with_op_at(1)), rows.next());
assert_eq!(Some(test.row_with_op_at(2)), rows.next());
assert_eq!(Some(test.row_with_op_at(3)), rows.next());
}
#[test]
fn test_chunk_records() {
let test = WhatEverStreamChunk;
let chunk = test.stream_chunk();
let mut rows = chunk
.records()
.flat_map(Record::into_rows)
.map(|(op, row)| (op, row.into_owned_row()));
assert_eq!(Some(test.row_with_op_at(0)), rows.next());
assert_eq!(Some(test.row_with_op_at(1)), rows.next());
assert_eq!(Some(test.row_with_op_at(2)), rows.next());
assert_eq!(Some(test.row_with_op_at(3)), rows.next());
}
#[bench]
fn bench_rows_iterator_from_records(b: &mut Bencher) {
let chunk = BigStreamChunk::new(10000).stream_chunk();
b.iter(|| {
for (_op, row) in chunk.records().flat_map(Record::into_rows) {
test::black_box(row.iter().count());
}
})
}
#[bench]
fn bench_rows_iterator(b: &mut Bencher) {
let chunk = BigStreamChunk::new(10000).stream_chunk();
b.iter(|| {
for (_op, row) in chunk.rows() {
test::black_box(row.iter().count());
}
})
}
#[bench]
fn bench_rows_iterator_vec_of_datum_refs(b: &mut Bencher) {
let chunk = BigStreamChunk::new(10000).stream_chunk();
b.iter(|| {
for (_op, row) in chunk.rows() {
let row = row.iter().collect_vec();
test::black_box(row);
}
})
}
#[bench]
fn bench_record_iterator(b: &mut Bencher) {
let chunk = BigStreamChunk::new(10000).stream_chunk();
b.iter(|| {
for record in chunk.records() {
match record {
Record::Insert { new_row } => test::black_box(new_row.iter().count()),
Record::Delete { old_row } => test::black_box(old_row.iter().count()),
Record::Update { old_row, new_row } => {
test::black_box(old_row.iter().count());
test::black_box(new_row.iter().count())
}
};
}
})
}
}