risingwave_stream/executor/
changelog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Formatter;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use futures::prelude::stream::StreamExt;
20use futures_async_stream::try_stream;
21use risingwave_common::array::{ArrayImpl, I16Array, Op, SerialArray, StreamChunk};
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::hash::VirtualNode;
24use risingwave_common::types::Serial;
25use risingwave_common::util::row_id::ChangelogRowIdGenerator;
26
27use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError};
28
29pub struct ChangeLogExecutor {
30    ctx: ActorContextRef,
31    input: Executor,
32    need_op: bool,
33    all_vnode_count: usize,
34    distribution_keys: Vec<usize>,
35    changelog_row_id_generator: ChangelogRowIdGenerator,
36}
37
38impl Debug for ChangeLogExecutor {
39    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("ChangeLogExecutor").finish()
41    }
42}
43
44impl Execute for ChangeLogExecutor {
45    fn execute(self: Box<Self>) -> BoxedMessageStream {
46        self.execute_inner().boxed()
47    }
48}
49impl ChangeLogExecutor {
50    pub fn new(
51        ctx: ActorContextRef,
52        input: Executor,
53        need_op: bool,
54        all_vnode_count: usize,
55        vnodes: Bitmap,
56        distribution_keys: Vec<usize>,
57    ) -> Self {
58        let changelog_row_id_generator = ChangelogRowIdGenerator::new(vnodes, all_vnode_count);
59        Self {
60            ctx,
61            input,
62            need_op,
63            all_vnode_count,
64            distribution_keys,
65            changelog_row_id_generator,
66        }
67    }
68
69    #[try_stream(ok = Message, error = StreamExecutorError)]
70    async fn execute_inner(mut self) {
71        let input = self.input.execute();
72        #[for_await]
73        for msg in input {
74            let msg = msg?;
75            match msg {
76                Message::Chunk(chunk) => {
77                    let data_chunk = chunk.data_chunk();
78                    let vnodes = VirtualNode::compute_chunk(
79                        data_chunk,
80                        &self.distribution_keys,
81                        self.all_vnode_count,
82                    );
83                    let (ops, mut columns, bitmap) = chunk.into_inner();
84                    let new_ops = vec![Op::Insert; ops.len()];
85                    let changelog_row_ids = vnodes
86                        .iter()
87                        .map(|vnode| self.changelog_row_id_generator.next(vnode));
88                    let changelog_row_id_array = Arc::new(ArrayImpl::Serial(
89                        SerialArray::from_iter(changelog_row_ids.map(Serial::from)),
90                    ));
91                    let new_chunk = if self.need_op {
92                        let ops: Vec<Option<i16>> =
93                            ops.iter().map(|op| Some(op.to_i16())).collect();
94                        let ops_array = Arc::new(ArrayImpl::Int16(I16Array::from_iter(ops)));
95                        columns.push(ops_array);
96                        columns.push(changelog_row_id_array);
97                        StreamChunk::with_visibility(new_ops, columns, bitmap)
98                    } else {
99                        columns.push(changelog_row_id_array);
100                        StreamChunk::with_visibility(new_ops, columns, bitmap)
101                    };
102                    yield Message::Chunk(new_chunk);
103                }
104                Message::Watermark(_w) => {}
105                Message::Barrier(barrier) => {
106                    if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) {
107                        self.changelog_row_id_generator = ChangelogRowIdGenerator::new(
108                            vnodes.as_ref().clone(),
109                            self.all_vnode_count,
110                        );
111                    }
112                    yield Message::Barrier(barrier);
113                }
114            }
115        }
116    }
117}