risingwave_stream/executor/
values.rs1use std::vec;
16
17use risingwave_common::array::{DataChunk, Op};
18use risingwave_common::ensure;
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_expr::expr::NonStrictExpression;
21use tokio::sync::mpsc::UnboundedReceiver;
22
23use crate::executor::prelude::*;
24use crate::task::CreateMviewProgressReporter;
25
26const DEFAULT_CHUNK_SIZE: usize = 1024;
27
28pub struct ValuesExecutor {
31 ctx: ActorContextRef,
32
33 schema: Schema,
34 barrier_receiver: UnboundedReceiver<Barrier>,
36 progress: CreateMviewProgressReporter,
37
38 rows: vec::IntoIter<Vec<NonStrictExpression>>,
39}
40
41impl ValuesExecutor {
42 pub fn new(
44 ctx: ActorContextRef,
45 schema: Schema,
46 progress: CreateMviewProgressReporter,
47 rows: Vec<Vec<NonStrictExpression>>,
48 barrier_receiver: UnboundedReceiver<Barrier>,
49 ) -> Self {
50 Self {
51 ctx,
52 schema,
53 progress,
54 barrier_receiver,
55 rows: rows.into_iter(),
56 }
57 }
58
59 #[try_stream(ok = Message, error = StreamExecutorError)]
60 async fn execute_inner(self) {
61 let Self {
62 schema,
63 mut progress,
64 mut barrier_receiver,
65 mut rows,
66 ..
67 } = self;
68 let barrier = barrier_receiver
69 .recv()
70 .instrument_await("values_executor_recv_first_barrier")
71 .await
72 .unwrap();
73
74 let emit = barrier.is_newly_added(self.ctx.id);
75 let paused_on_startup = barrier.is_pause_on_startup();
76
77 yield Message::Barrier(barrier);
78
79 if emit {
81 if paused_on_startup {
82 while let Some(barrier) = barrier_receiver.recv().await {
84 let is_resume = barrier.is_resume();
85 yield Message::Barrier(barrier);
86 if is_resume {
87 break;
88 }
89 }
90 }
91
92 let cardinality = schema.len();
93 ensure!(cardinality > 0);
94 while !rows.is_empty() {
95 let one_row_chunk = DataChunk::new_dummy(1);
99
100 let chunk_size = DEFAULT_CHUNK_SIZE.min(rows.len());
101 let mut array_builders = schema.create_array_builders(chunk_size);
102 for row in rows.by_ref().take(chunk_size) {
103 for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
104 let out = expr.eval_infallible(&one_row_chunk).await;
105 builder.append_array(&out);
106 }
107 }
108
109 let columns: Vec<_> = array_builders
110 .into_iter()
111 .map(|b| b.finish().into())
112 .collect();
113
114 let chunk = DataChunk::new(columns, chunk_size);
115 let ops = vec![Op::Insert; chunk_size];
116
117 let stream_chunk = StreamChunk::from_parts(ops, chunk);
118 yield Message::Chunk(stream_chunk);
119 }
120 }
121
122 while let Some(barrier) = barrier_receiver.recv().await {
123 if emit {
124 progress.finish(barrier.epoch, 0);
125 }
126 yield Message::Barrier(barrier);
127 }
128 }
129}
130
131impl Execute for ValuesExecutor {
132 fn execute(self: Box<Self>) -> BoxedMessageStream {
133 self.execute_inner().boxed()
134 }
135}
136
137#[cfg(test)]
138mod tests {
139
140 use futures::StreamExt;
141 use risingwave_common::array::{
142 Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
143 };
144 use risingwave_common::catalog::{Field, Schema};
145 use risingwave_common::types::{DataType, ScalarImpl, StructType};
146 use risingwave_common::util::epoch::test_epoch;
147 use risingwave_expr::expr::{BoxedExpression, LiteralExpression, NonStrictExpression};
148 use tokio::sync::mpsc::unbounded_channel;
149
150 use super::ValuesExecutor;
151 use crate::executor::test_utils::StreamExecutorTestExt;
152 use crate::executor::{ActorContext, AddMutation, Barrier, Execute, Mutation};
153 use crate::task::{CreateMviewProgressReporter, LocalBarrierManager};
154
155 #[tokio::test]
156 async fn test_values() {
157 let barrier_manager = LocalBarrierManager::for_test();
158 let progress = CreateMviewProgressReporter::for_test(barrier_manager);
159 let actor_id = progress.actor_id();
160 let (tx, barrier_receiver) = unbounded_channel();
161 let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
162 let exprs = vec![
163 Box::new(LiteralExpression::new(
164 DataType::Int16,
165 Some(ScalarImpl::Int16(1)),
166 )) as BoxedExpression,
167 Box::new(LiteralExpression::new(
168 DataType::Int32,
169 Some(ScalarImpl::Int32(2)),
170 )),
171 Box::new(LiteralExpression::new(
172 DataType::Int64,
173 Some(ScalarImpl::Int64(3)),
174 )),
175 Box::new(LiteralExpression::new(
176 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
177 Some(ScalarImpl::Struct(value)),
178 )),
179 Box::new(LiteralExpression::new(
180 DataType::Int64,
181 Some(ScalarImpl::Int64(0)),
182 )),
183 ];
184 let schema = exprs
185 .iter() .map(|col| Field::unnamed(col.return_type()))
187 .collect::<Schema>();
188 let values_executor_struct = ValuesExecutor::new(
189 ActorContext::for_test(actor_id),
190 schema,
191 progress,
192 vec![
193 exprs
194 .into_iter()
195 .map(NonStrictExpression::for_test)
196 .collect(),
197 ],
198 barrier_receiver,
199 );
200 let mut values_executor = Box::new(values_executor_struct).execute();
201
202 let first_message =
204 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
205 adds: Default::default(),
206 added_actors: maplit::hashset! {actor_id},
207 splits: Default::default(),
208 pause: false,
209 subscriptions_to_add: vec![],
210 }));
211 tx.send(first_message).unwrap();
212
213 assert!(matches!(
214 values_executor.next_unwrap_ready_barrier().unwrap(),
215 Barrier { .. }
216 ));
217
218 let values_msg = values_executor.next().await.unwrap().unwrap();
220
221 let result = values_msg
222 .into_chunk()
223 .unwrap()
224 .compact()
225 .data_chunk()
226 .to_owned();
227
228 let array: ArrayImpl = StructArray::new(
229 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
230 vec![
231 I32Array::from_iter([1]).into_ref(),
232 I32Array::from_iter([2]).into_ref(),
233 I32Array::from_iter([3]).into_ref(),
234 ],
235 [true].into_iter().collect(),
236 )
237 .into();
238
239 assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
240 assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
241 assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
242 assert_eq!(*result.column_at(3), array.into());
243 assert_eq!(*result.column_at(4), I64Array::from_iter([0]).into_ref());
244
245 tx.send(Barrier::new_test_barrier(test_epoch(2))).unwrap();
247
248 assert!(matches!(
249 values_executor.next_unwrap_ready_barrier().unwrap(),
250 Barrier { .. }
251 ));
252
253 tx.send(Barrier::new_test_barrier(test_epoch(3))).unwrap();
254
255 assert!(matches!(
256 values_executor.next_unwrap_ready_barrier().unwrap(),
257 Barrier { .. }
258 ));
259 }
260}