risingwave_stream/executor/
changelog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Formatter;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use futures::prelude::stream::StreamExt;
20use futures_async_stream::try_stream;
21use risingwave_common::array::{ArrayImpl, I16Array, Op, SerialArray, StreamChunk};
22
23use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError};
24
25pub struct ChangeLogExecutor {
26    _ctx: ActorContextRef,
27    input: Executor,
28    need_op: bool,
29}
30
31impl Debug for ChangeLogExecutor {
32    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33        f.debug_struct("ChangeLogExecutor").finish()
34    }
35}
36
37impl Execute for ChangeLogExecutor {
38    fn execute(self: Box<Self>) -> BoxedMessageStream {
39        self.execute_inner().boxed()
40    }
41}
42impl ChangeLogExecutor {
43    pub fn new(ctx: ActorContextRef, input: Executor, need_op: bool) -> Self {
44        Self {
45            _ctx: ctx,
46            input,
47            need_op,
48        }
49    }
50
51    #[try_stream(ok = Message, error = StreamExecutorError)]
52    async fn execute_inner(self) {
53        let input = self.input.execute();
54        #[for_await]
55        for msg in input {
56            let msg = msg?;
57            match msg {
58                Message::Chunk(chunk) => {
59                    let (ops, mut columns, bitmap) = chunk.into_inner();
60                    let new_ops = vec![Op::Insert; ops.len()];
61                    // They are all 0, will be add in row id gen executor.
62                    let changelog_row_id_array = Arc::new(ArrayImpl::Serial(
63                        SerialArray::from_iter(std::iter::repeat_n(None, ops.len())),
64                    ));
65                    let new_chunk = if self.need_op {
66                        let ops: Vec<Option<i16>> =
67                            ops.iter().map(|op| Some(op.to_i16())).collect();
68                        let ops_array = Arc::new(ArrayImpl::Int16(I16Array::from_iter(ops)));
69                        columns.push(ops_array);
70                        columns.push(changelog_row_id_array);
71                        StreamChunk::with_visibility(new_ops, columns, bitmap)
72                    } else {
73                        columns.push(changelog_row_id_array);
74                        StreamChunk::with_visibility(new_ops, columns, bitmap)
75                    };
76                    yield Message::Chunk(new_chunk);
77                }
78                Message::Watermark(_w) => {}
79                m => yield m,
80            }
81        }
82    }
83}