risingwave_stream/executor/
troublemaker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rand::Rng;
16use risingwave_common::array::Op;
17use risingwave_common::array::stream_record::{Record, RecordType};
18use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty};
19use risingwave_common::util::iter_util::ZipEqFast;
20use smallvec::SmallVec;
21
22use crate::consistency::insane;
23use crate::executor::prelude::*;
24
25/// [`TroublemakerExecutor`] is used to make some trouble in the stream graph. Specifically,
26/// it is attached to `StreamScan` and `Source` executors in **insane mode**. It randomly
27/// corrupts the stream chunks it receives and sends them downstream, making the stream
28/// inconsistent. This should ONLY BE USED IN INSANE MODE FOR TESTING PURPOSES.
29pub struct TroublemakerExecutor {
30    input: Executor,
31    inner: Inner,
32}
33
34struct Inner {
35    chunk_size: usize,
36}
37
38struct Vars {
39    chunk_builder: StreamChunkBuilder,
40    met_delete_before: bool,
41    field_generators: Box<[Option<FieldGeneratorImpl>]>,
42}
43
44impl TroublemakerExecutor {
45    pub fn new(input: Executor, chunk_size: usize) -> Self {
46        assert!(insane(), "we should only make trouble in insane mode");
47        tracing::info!("we got a troublemaker");
48        Self {
49            input,
50            inner: Inner { chunk_size },
51        }
52    }
53
54    #[try_stream(ok = Message, error = StreamExecutorError)]
55    async fn execute_inner(self) {
56        let Self { input, inner: this } = self;
57
58        let mut field_generators = vec![];
59        for data_type in input.schema().data_types() {
60            let field_gen = match data_type {
61                t @ (DataType::Int16
62                | DataType::Int32
63                | DataType::Int64
64                | DataType::Float32
65                | DataType::Float64) => {
66                    FieldGeneratorImpl::with_number_random(t, None, None, rand::random()).ok()
67                }
68                DataType::Varchar => Some(FieldGeneratorImpl::with_varchar(
69                    &VarcharProperty::RandomVariableLength,
70                    rand::random(),
71                )),
72                _ => None,
73            };
74            field_generators.push(field_gen);
75        }
76
77        let mut vars = Vars {
78            chunk_builder: StreamChunkBuilder::new(this.chunk_size, input.schema().data_types()),
79            met_delete_before: false,
80            field_generators: field_generators.into_boxed_slice(),
81        };
82
83        #[for_await]
84        for msg in input.execute() {
85            let msg = msg?;
86            match msg {
87                Message::Chunk(chunk) => {
88                    for record in chunk.records() {
89                        if matches!(
90                            record.to_record_type(),
91                            RecordType::Delete | RecordType::Update
92                        ) {
93                            vars.met_delete_before = true;
94                        }
95
96                        for (op, row) in Self::make_some_trouble(&this, &mut vars, record) {
97                            if let Some(chunk) = vars.chunk_builder.append_row(op, row) {
98                                yield Message::Chunk(chunk);
99                            }
100                        }
101                    }
102
103                    if let Some(chunk) = vars.chunk_builder.take() {
104                        yield Message::Chunk(chunk);
105                    }
106                }
107                Message::Barrier(barrier) => {
108                    assert!(
109                        vars.chunk_builder.take().is_none(),
110                        "we don't merge chunks across barriers"
111                    );
112                    yield Message::Barrier(barrier);
113                }
114                _ => yield msg,
115            }
116        }
117    }
118
119    fn make_some_trouble<'a>(
120        _this: &'a Inner,
121        vars: &'a mut Vars,
122        record: Record<impl Row>,
123    ) -> SmallVec<[(Op, OwnedRow); 2]> {
124        let record = if vars.met_delete_before && rand::rng().random_bool(0.5) {
125            // Change the `Op`.
126            // Because we don't know the `append_only` property of the stream, we can't
127            // generate `Delete` arbitrarily. So we just generate `Delete` after we saw
128            // `Delete` or `Update` before.
129            match record {
130                Record::Insert { new_row } => Record::Delete {
131                    old_row: new_row.into_owned_row(),
132                },
133                Record::Delete { old_row } => Record::Insert {
134                    new_row: old_row.into_owned_row(),
135                },
136                Record::Update { old_row, new_row } => Record::Update {
137                    old_row: new_row.into_owned_row(),
138                    new_row: old_row.into_owned_row(),
139                },
140            }
141        } else {
142            // Just convert the rows to owned rows, without changing the `Op`.
143            match record {
144                Record::Insert { new_row } => Record::Insert {
145                    new_row: new_row.into_owned_row(),
146                },
147                Record::Delete { old_row } => Record::Delete {
148                    old_row: old_row.into_owned_row(),
149                },
150                Record::Update { old_row, new_row } => Record::Update {
151                    old_row: old_row.into_owned_row(),
152                    new_row: new_row.into_owned_row(),
153                },
154            }
155        };
156
157        record
158            .into_rows()
159            .map(|(op, row)| {
160                let mut data = row.into_inner();
161
162                for (datum, r#gen) in data
163                    .iter_mut()
164                    .zip_eq_fast(vars.field_generators.iter_mut())
165                {
166                    match rand::rng().random_range(0..4) {
167                        0 | 1 => {
168                            // don't change the value
169                        }
170                        2 => {
171                            *datum = None;
172                        }
173                        3 => {
174                            *datum = r#gen
175                                .as_mut()
176                                .and_then(|r#gen| r#gen.generate_datum(rand::random()))
177                                .or(datum.take());
178                        }
179                        _ => unreachable!(),
180                    }
181                }
182
183                (op, OwnedRow::new(data.into_vec()))
184            })
185            .collect()
186    }
187}
188
189impl Execute for TroublemakerExecutor {
190    fn execute(self: Box<Self>) -> BoxedMessageStream {
191        self.execute_inner().boxed()
192    }
193}