risingwave_stream/executor/
row_id_gen.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use risingwave_common::array::{Array, ArrayBuilder, ArrayRef, Op, SerialArrayBuilder};
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::hash::VnodeBitmapExt;
20use risingwave_common::log::LogSuppressor;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::row_id::{RowIdGenerator, compute_vnode_from_row_id};
23use risingwave_storage::table::check_vnode_is_set;
24
25use crate::executor::prelude::*;
26
27/// [`RowIdGenExecutor`] generates row id for data, where the user has not specified a pk.
28pub struct RowIdGenExecutor {
29    ctx: ActorContextRef,
30
31    upstream: Option<Executor>,
32
33    row_id_index: usize,
34
35    row_id_generator: RowIdGenerator,
36
37    vnodes: Arc<Bitmap>,
38}
39
40impl RowIdGenExecutor {
41    pub fn new(
42        ctx: ActorContextRef,
43        upstream: Executor,
44        row_id_index: usize,
45        vnodes: Bitmap,
46    ) -> Self {
47        Self {
48            ctx,
49            upstream: Some(upstream),
50            row_id_index,
51            row_id_generator: Self::new_generator(&vnodes),
52            vnodes: Arc::new(vnodes),
53        }
54    }
55
56    /// Create a new row id generator based on the assigned vnodes.
57    fn new_generator(vnodes: &Bitmap) -> RowIdGenerator {
58        RowIdGenerator::new(vnodes.iter_vnodes(), vnodes.len())
59    }
60
61    /// Generate a row ID column according to ops.
62    fn gen_row_id_column_by_op(
63        &mut self,
64        column: &ArrayRef,
65        ops: &'_ [Op],
66        vis: &Bitmap,
67    ) -> ArrayRef {
68        let column = column.as_serial();
69        let len = column.len();
70        let mut builder = SerialArrayBuilder::new(len);
71
72        for ((serial, op), vis) in column.iter().zip_eq_fast(ops).zip_eq_fast(vis.iter()) {
73            macro_rules! warn_absent_row_id {
74                ($op:expr) => {{
75                    static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
76                        LazyLock::new(LogSuppressor::default);
77                    if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
78                        tracing::warn!(
79                            suppressed_count,
80                            ?op,
81                            "absent row id for non-insert operation"
82                        );
83                    }
84                    None
85                }};
86            }
87
88            let row_id = if vis {
89                match serial {
90                    // If the row id is already present, e.g., from `UPDATE` or `DELETE`, keep it.
91                    Some(serial) => {
92                        // Check if the data is shuffled correctly.
93                        if cfg!(debug_assertions) {
94                            check_vnode_is_set(
95                                compute_vnode_from_row_id(serial.as_row_id(), self.vnodes.len()),
96                                &self.vnodes,
97                            );
98                        }
99                        Some(serial)
100                    }
101                    // Otherwise, generate a new row id only if it's an `Insert` operation.
102                    None => match op {
103                        Op::Insert => Some(self.row_id_generator.next().into()),
104                        Op::Delete => warn_absent_row_id!(op),
105                        Op::UpdateDelete => warn_absent_row_id!(op),
106                        Op::UpdateInsert => warn_absent_row_id!(op),
107                    },
108                }
109            } else {
110                None
111            };
112
113            builder.append(row_id);
114        }
115
116        builder.finish().into_ref()
117    }
118
119    #[try_stream(ok = Message, error = StreamExecutorError)]
120    async fn execute_inner(mut self) {
121        let mut upstream = self.upstream.take().unwrap().execute();
122
123        // The first barrier mush propagated.
124        let barrier = expect_first_barrier(&mut upstream).await?;
125        yield Message::Barrier(barrier);
126
127        #[for_await]
128        for msg in upstream {
129            let msg = msg?;
130
131            match msg {
132                Message::Chunk(chunk) => {
133                    // For chunk message, we fill the row id column and then yield it.
134                    let (ops, mut columns, bitmap) = chunk.into_inner();
135                    columns[self.row_id_index] =
136                        self.gen_row_id_column_by_op(&columns[self.row_id_index], &ops, &bitmap);
137                    yield Message::Chunk(StreamChunk::with_visibility(ops, columns, bitmap));
138                }
139                Message::Barrier(barrier) => {
140                    // Update row id generator if vnode mapping changes.
141                    // Note that: since `Update` barrier only exists between `Pause` and `Resume`
142                    // barrier, duplicated row id won't be generated.
143                    if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) {
144                        self.row_id_generator = Self::new_generator(&vnodes);
145                        self.vnodes = vnodes;
146                    }
147                    yield Message::Barrier(barrier);
148                }
149                Message::Watermark(watermark) => yield Message::Watermark(watermark),
150            }
151        }
152    }
153}
154
155impl Execute for RowIdGenExecutor {
156    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
157        self.execute_inner().boxed()
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use risingwave_common::array::PrimitiveArray;
164    use risingwave_common::catalog::Field;
165    use risingwave_common::hash::VirtualNode;
166    use risingwave_common::test_prelude::StreamChunkTestExt;
167    use risingwave_common::types::Serial;
168    use risingwave_common::util::epoch::test_epoch;
169
170    use super::*;
171    use crate::executor::test_utils::MockSource;
172
173    #[tokio::test]
174    async fn test_row_id_gen_executor() {
175        // This test only works when vnode count is 256.
176        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
177
178        let schema = Schema::new(vec![
179            Field::unnamed(DataType::Serial),
180            Field::unnamed(DataType::Int64),
181        ]);
182        let stream_key = vec![0];
183        let row_id_index = 0;
184        let row_id_generator = Bitmap::ones(VirtualNode::COUNT_FOR_TEST);
185        let (mut tx, upstream) = MockSource::channel();
186        let upstream = upstream.into_executor(schema.clone(), stream_key.clone());
187
188        let row_id_gen_executor = RowIdGenExecutor::new(
189            ActorContext::for_test(233),
190            upstream,
191            row_id_index,
192            row_id_generator,
193        );
194        let mut row_id_gen_executor = row_id_gen_executor.boxed().execute();
195
196        // Init barrier
197        tx.push_barrier(test_epoch(1), false);
198        row_id_gen_executor.next().await.unwrap().unwrap();
199
200        // Insert operation
201        let chunk1 = StreamChunk::from_pretty(
202            " SRL I
203            + . 1
204            + . 2
205            + . 6
206            + . 7",
207        );
208        tx.push_chunk(chunk1);
209        let chunk: StreamChunk = row_id_gen_executor
210            .next()
211            .await
212            .unwrap()
213            .unwrap()
214            .into_chunk()
215            .unwrap();
216        let row_id_col: &PrimitiveArray<Serial> = chunk.column_at(row_id_index).as_serial();
217        row_id_col.iter().for_each(|row_id| {
218            // Should generate row id for insert operations.
219            assert!(row_id.is_some());
220        });
221
222        // Update operation
223        let chunk2 = StreamChunk::from_pretty(
224            "      SRL        I
225            U- 32874283748  1
226            U+ 32874283748 999",
227        );
228        tx.push_chunk(chunk2);
229        let chunk: StreamChunk = row_id_gen_executor
230            .next()
231            .await
232            .unwrap()
233            .unwrap()
234            .into_chunk()
235            .unwrap();
236        let row_id_col: &PrimitiveArray<Serial> = chunk.column_at(row_id_index).as_serial();
237        // Should not generate row id for update operations.
238        assert_eq!(row_id_col.value_at(0).unwrap(), Serial::from(32874283748));
239        assert_eq!(row_id_col.value_at(1).unwrap(), Serial::from(32874283748));
240
241        // Delete operation
242        let chunk3 = StreamChunk::from_pretty(
243            "      SRL       I
244            - 84629409685  1",
245        );
246        tx.push_chunk(chunk3);
247        let chunk: StreamChunk = row_id_gen_executor
248            .next()
249            .await
250            .unwrap()
251            .unwrap()
252            .into_chunk()
253            .unwrap();
254        let row_id_col: &PrimitiveArray<Serial> = chunk.column_at(row_id_index).as_serial();
255        // Should not generate row id for delete operations.
256        assert_eq!(row_id_col.value_at(0).unwrap(), Serial::from(84629409685));
257    }
258}