risingwave_stream/executor/
values.rs1use std::vec;
16
17use risingwave_common::array::{DataChunk, Op};
18use risingwave_common::ensure;
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_expr::expr::NonStrictExpression;
21use tokio::sync::mpsc::UnboundedReceiver;
22
23use crate::executor::prelude::*;
24use crate::task::CreateMviewProgressReporter;
25
26const DEFAULT_CHUNK_SIZE: usize = 1024;
27
28pub struct ValuesExecutor {
31 ctx: ActorContextRef,
32
33 schema: Schema,
34 barrier_receiver: UnboundedReceiver<Barrier>,
36 progress: CreateMviewProgressReporter,
37
38 rows: vec::IntoIter<Vec<NonStrictExpression>>,
39}
40
41impl ValuesExecutor {
42 pub fn new(
44 ctx: ActorContextRef,
45 schema: Schema,
46 progress: CreateMviewProgressReporter,
47 rows: Vec<Vec<NonStrictExpression>>,
48 barrier_receiver: UnboundedReceiver<Barrier>,
49 ) -> Self {
50 Self {
51 ctx,
52 schema,
53 progress,
54 barrier_receiver,
55 rows: rows.into_iter(),
56 }
57 }
58
59 #[try_stream(ok = Message, error = StreamExecutorError)]
60 async fn execute_inner(self) {
61 let Self {
62 schema,
63 mut progress,
64 mut barrier_receiver,
65 mut rows,
66 ..
67 } = self;
68 let barrier = barrier_receiver
69 .recv()
70 .instrument_await("values_executor_recv_first_barrier")
71 .await
72 .unwrap();
73
74 let emit = barrier.is_newly_added(self.ctx.id);
75 let paused_on_startup = barrier.is_pause_on_startup();
76
77 yield Message::Barrier(barrier);
78
79 if emit {
81 if paused_on_startup {
82 while let Some(barrier) = barrier_receiver.recv().await {
84 let is_resume = barrier.is_resume();
85 yield Message::Barrier(barrier);
86 if is_resume {
87 break;
88 }
89 }
90 }
91
92 let cardinality = schema.len();
93 ensure!(cardinality > 0);
94 while !rows.is_empty() {
95 let one_row_chunk = DataChunk::new_dummy(1);
99
100 let chunk_size = DEFAULT_CHUNK_SIZE.min(rows.len());
101 let mut array_builders = schema.create_array_builders(chunk_size);
102 for row in rows.by_ref().take(chunk_size) {
103 for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
104 let out = expr.eval_infallible(&one_row_chunk).await;
105 builder.append_array(&out);
106 }
107 }
108
109 let columns: Vec<_> = array_builders
110 .into_iter()
111 .map(|b| b.finish().into())
112 .collect();
113
114 let chunk = DataChunk::new(columns, chunk_size);
115 let ops = vec![Op::Insert; chunk_size];
116
117 let stream_chunk = StreamChunk::from_parts(ops, chunk);
118 yield Message::Chunk(stream_chunk);
119 }
120 }
121
122 while let Some(barrier) = barrier_receiver.recv().await {
123 if emit {
124 progress.finish(barrier.epoch, 0);
125 }
126 yield Message::Barrier(barrier);
127 }
128 }
129}
130
131impl Execute for ValuesExecutor {
132 fn execute(self: Box<Self>) -> BoxedMessageStream {
133 self.execute_inner().boxed()
134 }
135}
136
137#[cfg(test)]
138mod tests {
139
140 use futures::StreamExt;
141 use risingwave_common::array::{
142 Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
143 };
144 use risingwave_common::catalog::{Field, Schema};
145 use risingwave_common::types::{DataType, ScalarImpl, StructType};
146 use risingwave_common::util::epoch::test_epoch;
147 use risingwave_expr::expr::{BoxedExpression, LiteralExpression, NonStrictExpression};
148 use tokio::sync::mpsc::unbounded_channel;
149
150 use super::ValuesExecutor;
151 use crate::executor::test_utils::StreamExecutorTestExt;
152 use crate::executor::{ActorContext, AddMutation, Barrier, Execute, Mutation};
153 use crate::task::CreateMviewProgressReporter;
154 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
155
156 #[tokio::test]
157 async fn test_values() {
158 let test_env = LocalBarrierTestEnv::for_test().await;
159 let barrier_manager = test_env.local_barrier_manager.clone();
160 let progress = CreateMviewProgressReporter::for_test(barrier_manager);
161 let actor_id = progress.actor_id();
162 let (tx, barrier_receiver) = unbounded_channel();
163 let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
164 let exprs = vec![
165 Box::new(LiteralExpression::new(
166 DataType::Int16,
167 Some(ScalarImpl::Int16(1)),
168 )) as BoxedExpression,
169 Box::new(LiteralExpression::new(
170 DataType::Int32,
171 Some(ScalarImpl::Int32(2)),
172 )),
173 Box::new(LiteralExpression::new(
174 DataType::Int64,
175 Some(ScalarImpl::Int64(3)),
176 )),
177 Box::new(LiteralExpression::new(
178 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
179 Some(ScalarImpl::Struct(value)),
180 )),
181 Box::new(LiteralExpression::new(
182 DataType::Int64,
183 Some(ScalarImpl::Int64(0)),
184 )),
185 ];
186 let schema = exprs
187 .iter() .map(|col| Field::unnamed(col.return_type()))
189 .collect::<Schema>();
190 let values_executor_struct = ValuesExecutor::new(
191 ActorContext::for_test(actor_id),
192 schema,
193 progress,
194 vec![
195 exprs
196 .into_iter()
197 .map(NonStrictExpression::for_test)
198 .collect(),
199 ],
200 barrier_receiver,
201 );
202 let mut values_executor = Box::new(values_executor_struct).execute();
203
204 let first_message =
206 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
207 adds: Default::default(),
208 added_actors: maplit::hashset! {actor_id},
209 splits: Default::default(),
210 pause: false,
211 subscriptions_to_add: vec![],
212 backfill_nodes_to_pause: Default::default(),
213 }));
214 tx.send(first_message).unwrap();
215
216 assert!(matches!(
217 values_executor.next_unwrap_ready_barrier().unwrap(),
218 Barrier { .. }
219 ));
220
221 let values_msg = values_executor.next().await.unwrap().unwrap();
223
224 let result = values_msg
225 .into_chunk()
226 .unwrap()
227 .compact()
228 .data_chunk()
229 .to_owned();
230
231 let array: ArrayImpl = StructArray::new(
232 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
233 vec![
234 I32Array::from_iter([1]).into_ref(),
235 I32Array::from_iter([2]).into_ref(),
236 I32Array::from_iter([3]).into_ref(),
237 ],
238 [true].into_iter().collect(),
239 )
240 .into();
241
242 assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
243 assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
244 assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
245 assert_eq!(*result.column_at(3), array.into());
246 assert_eq!(*result.column_at(4), I64Array::from_iter([0]).into_ref());
247
248 tx.send(Barrier::new_test_barrier(test_epoch(2))).unwrap();
250
251 assert!(matches!(
252 values_executor.next_unwrap_ready_barrier().unwrap(),
253 Barrier { .. }
254 ));
255
256 tx.send(Barrier::new_test_barrier(test_epoch(3))).unwrap();
257
258 assert!(matches!(
259 values_executor.next_unwrap_ready_barrier().unwrap(),
260 Barrier { .. }
261 ));
262 }
263}