risingwave_stream/executor/
changelog.rs1use core::fmt::Formatter;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use futures::prelude::stream::StreamExt;
20use futures_async_stream::try_stream;
21use risingwave_common::array::{ArrayImpl, I16Array, Op, SerialArray, StreamChunk};
22
23use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError};
24
25pub struct ChangeLogExecutor {
26 _ctx: ActorContextRef,
27 input: Executor,
28 need_op: bool,
29}
30
31impl Debug for ChangeLogExecutor {
32 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33 f.debug_struct("ChangeLogExecutor").finish()
34 }
35}
36
37impl Execute for ChangeLogExecutor {
38 fn execute(self: Box<Self>) -> BoxedMessageStream {
39 self.execute_inner().boxed()
40 }
41}
42impl ChangeLogExecutor {
43 pub fn new(ctx: ActorContextRef, input: Executor, need_op: bool) -> Self {
44 Self {
45 _ctx: ctx,
46 input,
47 need_op,
48 }
49 }
50
51 #[try_stream(ok = Message, error = StreamExecutorError)]
52 async fn execute_inner(self) {
53 let input = self.input.execute();
54 #[for_await]
55 for msg in input {
56 let msg = msg?;
57 match msg {
58 Message::Chunk(chunk) => {
59 let (ops, mut columns, bitmap) = chunk.into_inner();
60 let new_ops = vec![Op::Insert; ops.len()];
61 let changelog_row_id_array = Arc::new(ArrayImpl::Serial(
63 SerialArray::from_iter(std::iter::repeat_n(None, ops.len())),
64 ));
65 let new_chunk = if self.need_op {
66 let ops: Vec<Option<i16>> =
67 ops.iter().map(|op| Some(op.to_i16())).collect();
68 let ops_array = Arc::new(ArrayImpl::Int16(I16Array::from_iter(ops)));
69 columns.push(ops_array);
70 columns.push(changelog_row_id_array);
71 StreamChunk::with_visibility(new_ops, columns, bitmap)
72 } else {
73 columns.push(changelog_row_id_array);
74 StreamChunk::with_visibility(new_ops, columns, bitmap)
75 };
76 yield Message::Chunk(new_chunk);
77 }
78 Message::Watermark(_w) => {}
79 m => yield m,
80 }
81 }
82 }
83}