risingwave_stream/executor/
row_id_gen.rs1use std::sync::LazyLock;
16
17use risingwave_common::array::{Array, ArrayBuilder, ArrayRef, Op, SerialArrayBuilder};
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::hash::VnodeBitmapExt;
20use risingwave_common::log::LogSuppressor;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::row_id::{RowIdGenerator, compute_vnode_from_row_id};
23use risingwave_storage::table::check_vnode_is_set;
24
25use crate::executor::prelude::*;
26
27pub struct RowIdGenExecutor {
29 ctx: ActorContextRef,
30
31 upstream: Option<Executor>,
32
33 row_id_index: usize,
34
35 row_id_generator: RowIdGenerator,
36
37 vnodes: Arc<Bitmap>,
38}
39
40impl RowIdGenExecutor {
41 pub fn new(
42 ctx: ActorContextRef,
43 upstream: Executor,
44 row_id_index: usize,
45 vnodes: Bitmap,
46 ) -> Self {
47 Self {
48 ctx,
49 upstream: Some(upstream),
50 row_id_index,
51 row_id_generator: Self::new_generator(&vnodes),
52 vnodes: Arc::new(vnodes),
53 }
54 }
55
56 fn new_generator(vnodes: &Bitmap) -> RowIdGenerator {
58 RowIdGenerator::new(vnodes.iter_vnodes(), vnodes.len())
59 }
60
61 fn gen_row_id_column_by_op(
63 &mut self,
64 column: &ArrayRef,
65 ops: &'_ [Op],
66 vis: &Bitmap,
67 ) -> ArrayRef {
68 let column = column.as_serial();
69 let len = column.len();
70 let mut builder = SerialArrayBuilder::new(len);
71
72 for ((serial, op), vis) in column.iter().zip_eq_fast(ops).zip_eq_fast(vis.iter()) {
73 macro_rules! warn_absent_row_id {
74 ($op:expr) => {{
75 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
76 LazyLock::new(LogSuppressor::default);
77 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
78 tracing::warn!(
79 suppressed_count,
80 ?op,
81 "absent row id for non-insert operation"
82 );
83 }
84 None
85 }};
86 }
87
88 let row_id = if vis {
89 match serial {
90 Some(serial) => {
92 if cfg!(debug_assertions) {
94 check_vnode_is_set(
95 compute_vnode_from_row_id(serial.as_row_id(), self.vnodes.len()),
96 &self.vnodes,
97 );
98 }
99 Some(serial)
100 }
101 None => match op {
103 Op::Insert => Some(self.row_id_generator.next().into()),
104 Op::Delete => warn_absent_row_id!(op),
105 Op::UpdateDelete => warn_absent_row_id!(op),
106 Op::UpdateInsert => warn_absent_row_id!(op),
107 },
108 }
109 } else {
110 None
111 };
112
113 builder.append(row_id);
114 }
115
116 builder.finish().into_ref()
117 }
118
119 #[try_stream(ok = Message, error = StreamExecutorError)]
120 async fn execute_inner(mut self) {
121 let mut upstream = self.upstream.take().unwrap().execute();
122
123 let barrier = expect_first_barrier(&mut upstream).await?;
125 yield Message::Barrier(barrier);
126
127 #[for_await]
128 for msg in upstream {
129 let msg = msg?;
130
131 match msg {
132 Message::Chunk(chunk) => {
133 let (ops, mut columns, bitmap) = chunk.into_inner();
135 columns[self.row_id_index] =
136 self.gen_row_id_column_by_op(&columns[self.row_id_index], &ops, &bitmap);
137 yield Message::Chunk(StreamChunk::with_visibility(ops, columns, bitmap));
138 }
139 Message::Barrier(barrier) => {
140 if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) {
144 self.row_id_generator = Self::new_generator(&vnodes);
145 self.vnodes = vnodes;
146 }
147 yield Message::Barrier(barrier);
148 }
149 Message::Watermark(watermark) => yield Message::Watermark(watermark),
150 }
151 }
152 }
153}
154
155impl Execute for RowIdGenExecutor {
156 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
157 self.execute_inner().boxed()
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use risingwave_common::array::PrimitiveArray;
164 use risingwave_common::catalog::Field;
165 use risingwave_common::hash::VirtualNode;
166 use risingwave_common::test_prelude::StreamChunkTestExt;
167 use risingwave_common::types::Serial;
168 use risingwave_common::util::epoch::test_epoch;
169
170 use super::*;
171 use crate::executor::test_utils::MockSource;
172
173 #[tokio::test]
174 async fn test_row_id_gen_executor() {
175 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
177
178 let schema = Schema::new(vec![
179 Field::unnamed(DataType::Serial),
180 Field::unnamed(DataType::Int64),
181 ]);
182 let stream_key = vec![0];
183 let row_id_index = 0;
184 let row_id_generator = Bitmap::ones(VirtualNode::COUNT_FOR_TEST);
185 let (mut tx, upstream) = MockSource::channel();
186 let upstream = upstream.into_executor(schema.clone(), stream_key.clone());
187
188 let row_id_gen_executor = RowIdGenExecutor::new(
189 ActorContext::for_test(233),
190 upstream,
191 row_id_index,
192 row_id_generator,
193 );
194 let mut row_id_gen_executor = row_id_gen_executor.boxed().execute();
195
196 tx.push_barrier(test_epoch(1), false);
198 row_id_gen_executor.next().await.unwrap().unwrap();
199
200 let chunk1 = StreamChunk::from_pretty(
202 " SRL I
203 + . 1
204 + . 2
205 + . 6
206 + . 7",
207 );
208 tx.push_chunk(chunk1);
209 let chunk: StreamChunk = row_id_gen_executor
210 .next()
211 .await
212 .unwrap()
213 .unwrap()
214 .into_chunk()
215 .unwrap();
216 let row_id_col: &PrimitiveArray<Serial> = chunk.column_at(row_id_index).as_serial();
217 row_id_col.iter().for_each(|row_id| {
218 assert!(row_id.is_some());
220 });
221
222 let chunk2 = StreamChunk::from_pretty(
224 " SRL I
225 U- 32874283748 1
226 U+ 32874283748 999",
227 );
228 tx.push_chunk(chunk2);
229 let chunk: StreamChunk = row_id_gen_executor
230 .next()
231 .await
232 .unwrap()
233 .unwrap()
234 .into_chunk()
235 .unwrap();
236 let row_id_col: &PrimitiveArray<Serial> = chunk.column_at(row_id_index).as_serial();
237 assert_eq!(row_id_col.value_at(0).unwrap(), Serial::from(32874283748));
239 assert_eq!(row_id_col.value_at(1).unwrap(), Serial::from(32874283748));
240
241 let chunk3 = StreamChunk::from_pretty(
243 " SRL I
244 - 84629409685 1",
245 );
246 tx.push_chunk(chunk3);
247 let chunk: StreamChunk = row_id_gen_executor
248 .next()
249 .await
250 .unwrap()
251 .unwrap()
252 .into_chunk()
253 .unwrap();
254 let row_id_col: &PrimitiveArray<Serial> = chunk.column_at(row_id_index).as_serial();
255 assert_eq!(row_id_col.value_at(0).unwrap(), Serial::from(84629409685));
257 }
258}