risingwave_stream/executor/
troublemaker.rs1use rand::Rng;
16use risingwave_common::array::Op;
17use risingwave_common::array::stream_record::{Record, RecordType};
18use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty};
19use risingwave_common::util::iter_util::ZipEqFast;
20use smallvec::SmallVec;
21
22use crate::consistency::insane;
23use crate::executor::prelude::*;
24
25pub struct TroublemakerExecutor {
30 input: Executor,
31 inner: Inner,
32}
33
34struct Inner {
35 chunk_size: usize,
36}
37
38struct Vars {
39 chunk_builder: StreamChunkBuilder,
40 met_delete_before: bool,
41 field_generators: Box<[Option<FieldGeneratorImpl>]>,
42}
43
44impl TroublemakerExecutor {
45 pub fn new(input: Executor, chunk_size: usize) -> Self {
46 assert!(insane(), "we should only make trouble in insane mode");
47 tracing::info!("we got a troublemaker");
48 Self {
49 input,
50 inner: Inner { chunk_size },
51 }
52 }
53
54 #[try_stream(ok = Message, error = StreamExecutorError)]
55 async fn execute_inner(self) {
56 let Self { input, inner: this } = self;
57
58 let mut field_generators = vec![];
59 for data_type in input.schema().data_types() {
60 let field_gen = match data_type {
61 t @ (DataType::Int16
62 | DataType::Int32
63 | DataType::Int64
64 | DataType::Float32
65 | DataType::Float64) => {
66 FieldGeneratorImpl::with_number_random(t, None, None, rand::random()).ok()
67 }
68 DataType::Varchar => Some(FieldGeneratorImpl::with_varchar(
69 &VarcharProperty::RandomVariableLength,
70 rand::random(),
71 )),
72 _ => None,
73 };
74 field_generators.push(field_gen);
75 }
76
77 let mut vars = Vars {
78 chunk_builder: StreamChunkBuilder::new(this.chunk_size, input.schema().data_types()),
79 met_delete_before: false,
80 field_generators: field_generators.into_boxed_slice(),
81 };
82
83 #[for_await]
84 for msg in input.execute() {
85 let msg = msg?;
86 match msg {
87 Message::Chunk(chunk) => {
88 for record in chunk.records() {
89 if matches!(
90 record.to_record_type(),
91 RecordType::Delete | RecordType::Update
92 ) {
93 vars.met_delete_before = true;
94 }
95
96 for (op, row) in Self::make_some_trouble(&this, &mut vars, record) {
97 if let Some(chunk) = vars.chunk_builder.append_row(op, row) {
98 yield Message::Chunk(chunk);
99 }
100 }
101 }
102
103 if let Some(chunk) = vars.chunk_builder.take() {
104 yield Message::Chunk(chunk);
105 }
106 }
107 Message::Barrier(barrier) => {
108 assert!(
109 vars.chunk_builder.take().is_none(),
110 "we don't merge chunks across barriers"
111 );
112 yield Message::Barrier(barrier);
113 }
114 _ => yield msg,
115 }
116 }
117 }
118
119 fn make_some_trouble<'a>(
120 _this: &'a Inner,
121 vars: &'a mut Vars,
122 record: Record<impl Row>,
123 ) -> SmallVec<[(Op, OwnedRow); 2]> {
124 let record = if vars.met_delete_before && rand::rng().random_bool(0.5) {
125 match record {
130 Record::Insert { new_row } => Record::Delete {
131 old_row: new_row.into_owned_row(),
132 },
133 Record::Delete { old_row } => Record::Insert {
134 new_row: old_row.into_owned_row(),
135 },
136 Record::Update { old_row, new_row } => Record::Update {
137 old_row: new_row.into_owned_row(),
138 new_row: old_row.into_owned_row(),
139 },
140 }
141 } else {
142 match record {
144 Record::Insert { new_row } => Record::Insert {
145 new_row: new_row.into_owned_row(),
146 },
147 Record::Delete { old_row } => Record::Delete {
148 old_row: old_row.into_owned_row(),
149 },
150 Record::Update { old_row, new_row } => Record::Update {
151 old_row: old_row.into_owned_row(),
152 new_row: new_row.into_owned_row(),
153 },
154 }
155 };
156
157 record
158 .into_rows()
159 .map(|(op, row)| {
160 let mut data = row.into_inner();
161
162 for (datum, r#gen) in data
163 .iter_mut()
164 .zip_eq_fast(vars.field_generators.iter_mut())
165 {
166 match rand::rng().random_range(0..4) {
167 0 | 1 => {
168 }
170 2 => {
171 *datum = None;
172 }
173 3 => {
174 *datum = r#gen
175 .as_mut()
176 .and_then(|r#gen| r#gen.generate_datum(rand::random()))
177 .or(datum.take());
178 }
179 _ => unreachable!(),
180 }
181 }
182
183 (op, OwnedRow::new(data.into_vec()))
184 })
185 .collect()
186 }
187}
188
189impl Execute for TroublemakerExecutor {
190 fn execute(self: Box<Self>) -> BoxedMessageStream {
191 self.execute_inner().boxed()
192 }
193}