risingwave_stream/executor/
row_id_gen.rsuse risingwave_common::array::stream_chunk::Ops;
use risingwave_common::array::{Array, ArrayBuilder, ArrayRef, Op, SerialArrayBuilder};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::types::Serial;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::row_id::RowIdGenerator;
use crate::executor::prelude::*;
pub struct RowIdGenExecutor {
ctx: ActorContextRef,
upstream: Option<Executor>,
row_id_index: usize,
row_id_generator: RowIdGenerator,
}
impl RowIdGenExecutor {
pub fn new(
ctx: ActorContextRef,
upstream: Executor,
row_id_index: usize,
vnodes: Bitmap,
) -> Self {
Self {
ctx,
upstream: Some(upstream),
row_id_index,
row_id_generator: Self::new_generator(&vnodes),
}
}
fn new_generator(vnodes: &Bitmap) -> RowIdGenerator {
RowIdGenerator::new(vnodes.iter_vnodes(), vnodes.len())
}
fn gen_row_id_column_by_op(
&mut self,
column: &ArrayRef,
ops: Ops<'_>,
vis: &Bitmap,
) -> ArrayRef {
let len = column.len();
let mut builder = SerialArrayBuilder::new(len);
for ((datum, op), vis) in column.iter().zip_eq_fast(ops).zip_eq_fast(vis.iter()) {
match op {
Op::Insert => builder.append(Some(self.row_id_generator.next().into())),
_ => {
if vis {
builder.append(Some(Serial::try_from(datum.unwrap()).unwrap()))
} else {
builder.append(None)
}
}
}
}
builder.finish().into_ref()
}
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
let mut upstream = self.upstream.take().unwrap().execute();
let barrier = expect_first_barrier(&mut upstream).await?;
yield Message::Barrier(barrier);
#[for_await]
for msg in upstream {
let msg = msg?;
match msg {
Message::Chunk(chunk) => {
let (ops, mut columns, bitmap) = chunk.into_inner();
columns[self.row_id_index] =
self.gen_row_id_column_by_op(&columns[self.row_id_index], &ops, &bitmap);
yield Message::Chunk(StreamChunk::with_visibility(ops, columns, bitmap));
}
Message::Barrier(barrier) => {
if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) {
self.row_id_generator = Self::new_generator(&vnodes);
}
yield Message::Barrier(barrier);
}
Message::Watermark(watermark) => yield Message::Watermark(watermark),
}
}
}
}
impl Execute for RowIdGenExecutor {
fn execute(self: Box<Self>) -> super::BoxedMessageStream {
self.execute_inner().boxed()
}
}
#[cfg(test)]
mod tests {
use risingwave_common::array::PrimitiveArray;
use risingwave_common::catalog::Field;
use risingwave_common::hash::VirtualNode;
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::util::epoch::test_epoch;
use super::*;
use crate::executor::test_utils::MockSource;
#[tokio::test]
async fn test_row_id_gen_executor() {
assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
let schema = Schema::new(vec![
Field::unnamed(DataType::Serial),
Field::unnamed(DataType::Int64),
]);
let pk_indices = vec![0];
let row_id_index = 0;
let row_id_generator = Bitmap::ones(VirtualNode::COUNT_FOR_TEST);
let (mut tx, upstream) = MockSource::channel();
let upstream = upstream.into_executor(schema.clone(), pk_indices.clone());
let row_id_gen_executor = RowIdGenExecutor::new(
ActorContext::for_test(233),
upstream,
row_id_index,
row_id_generator,
);
let mut row_id_gen_executor = row_id_gen_executor.boxed().execute();
tx.push_barrier(test_epoch(1), false);
row_id_gen_executor.next().await.unwrap().unwrap();
let chunk1 = StreamChunk::from_pretty(
" SRL I
+ . 1
+ . 2
+ . 6
+ . 7",
);
tx.push_chunk(chunk1);
let chunk: StreamChunk = row_id_gen_executor
.next()
.await
.unwrap()
.unwrap()
.into_chunk()
.unwrap();
let row_id_col: &PrimitiveArray<Serial> = chunk.column_at(row_id_index).as_serial();
row_id_col.iter().for_each(|row_id| {
assert!(row_id.is_some());
});
let chunk2 = StreamChunk::from_pretty(
" SRL I
U- 32874283748 1
U+ 32874283748 999",
);
tx.push_chunk(chunk2);
let chunk: StreamChunk = row_id_gen_executor
.next()
.await
.unwrap()
.unwrap()
.into_chunk()
.unwrap();
let row_id_col: &PrimitiveArray<Serial> = chunk.column_at(row_id_index).as_serial();
assert_eq!(row_id_col.value_at(0).unwrap(), Serial::from(32874283748));
assert_eq!(row_id_col.value_at(1).unwrap(), Serial::from(32874283748));
let chunk3 = StreamChunk::from_pretty(
" SRL I
- 84629409685 1",
);
tx.push_chunk(chunk3);
let chunk: StreamChunk = row_id_gen_executor
.next()
.await
.unwrap()
.unwrap()
.into_chunk()
.unwrap();
let row_id_col: &PrimitiveArray<Serial> = chunk.column_at(row_id_index).as_serial();
assert_eq!(row_id_col.value_at(0).unwrap(), Serial::from(84629409685));
}
}