risingwave_stream/executor/
changelog.rs1use core::fmt::Formatter;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use futures::prelude::stream::StreamExt;
20use futures_async_stream::try_stream;
21use risingwave_common::array::{ArrayImpl, I16Array, Op, SerialArray, StreamChunk};
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::hash::VirtualNode;
24use risingwave_common::types::Serial;
25use risingwave_common::util::row_id::ChangelogRowIdGenerator;
26
27use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError};
28
29pub struct ChangeLogExecutor {
30 ctx: ActorContextRef,
31 input: Executor,
32 need_op: bool,
33 all_vnode_count: usize,
34 distribution_keys: Vec<usize>,
35 changelog_row_id_generator: ChangelogRowIdGenerator,
36}
37
38impl Debug for ChangeLogExecutor {
39 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("ChangeLogExecutor").finish()
41 }
42}
43
44impl Execute for ChangeLogExecutor {
45 fn execute(self: Box<Self>) -> BoxedMessageStream {
46 self.execute_inner().boxed()
47 }
48}
49impl ChangeLogExecutor {
50 pub fn new(
51 ctx: ActorContextRef,
52 input: Executor,
53 need_op: bool,
54 all_vnode_count: usize,
55 vnodes: Bitmap,
56 distribution_keys: Vec<usize>,
57 ) -> Self {
58 let changelog_row_id_generator = ChangelogRowIdGenerator::new(vnodes, all_vnode_count);
59 Self {
60 ctx,
61 input,
62 need_op,
63 all_vnode_count,
64 distribution_keys,
65 changelog_row_id_generator,
66 }
67 }
68
69 #[try_stream(ok = Message, error = StreamExecutorError)]
70 async fn execute_inner(mut self) {
71 let input = self.input.execute();
72 #[for_await]
73 for msg in input {
74 let msg = msg?;
75 match msg {
76 Message::Chunk(chunk) => {
77 let data_chunk = chunk.data_chunk();
78 let vnodes = VirtualNode::compute_chunk(
79 data_chunk,
80 &self.distribution_keys,
81 self.all_vnode_count,
82 );
83 let (ops, mut columns, bitmap) = chunk.into_inner();
84 let new_ops = vec![Op::Insert; ops.len()];
85 let changelog_row_ids = vnodes
86 .iter()
87 .map(|vnode| self.changelog_row_id_generator.next(vnode));
88 let changelog_row_id_array = Arc::new(ArrayImpl::Serial(
89 SerialArray::from_iter(changelog_row_ids.map(Serial::from)),
90 ));
91 let new_chunk = if self.need_op {
92 let ops: Vec<Option<i16>> =
93 ops.iter().map(|op| Some(op.to_i16())).collect();
94 let ops_array = Arc::new(ArrayImpl::Int16(I16Array::from_iter(ops)));
95 columns.push(ops_array);
96 columns.push(changelog_row_id_array);
97 StreamChunk::with_visibility(new_ops, columns, bitmap)
98 } else {
99 columns.push(changelog_row_id_array);
100 StreamChunk::with_visibility(new_ops, columns, bitmap)
101 };
102 yield Message::Chunk(new_chunk);
103 }
104 Message::Watermark(_w) => {}
105 Message::Barrier(barrier) => {
106 if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) {
107 self.changelog_row_id_generator = ChangelogRowIdGenerator::new(
108 vnodes.as_ref().clone(),
109 self.all_vnode_count,
110 );
111 }
112 yield Message::Barrier(barrier);
113 }
114 }
115 }
116 }
117}