risingwave_stream/executor/
values.rs1use std::vec;
16
17use risingwave_common::array::{DataChunk, Op};
18use risingwave_common::ensure;
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_expr::expr::NonStrictExpression;
21use tokio::sync::mpsc::UnboundedReceiver;
22
23use crate::executor::prelude::*;
24use crate::task::CreateMviewProgressReporter;
25
26const DEFAULT_CHUNK_SIZE: usize = 1024;
27
28pub struct ValuesExecutor {
31 ctx: ActorContextRef,
32
33 schema: Schema,
34 barrier_receiver: UnboundedReceiver<Barrier>,
36 progress: CreateMviewProgressReporter,
37
38 rows: vec::IntoIter<Vec<NonStrictExpression>>,
39}
40
41impl ValuesExecutor {
42 pub fn new(
43 ctx: ActorContextRef,
44 schema: Schema,
45 progress: CreateMviewProgressReporter,
46 rows: Vec<Vec<NonStrictExpression>>,
47 barrier_receiver: UnboundedReceiver<Barrier>,
48 ) -> Self {
49 Self {
50 ctx,
51 schema,
52 progress,
53 barrier_receiver,
54 rows: rows.into_iter(),
55 }
56 }
57
58 #[try_stream(ok = Message, error = StreamExecutorError)]
59 async fn execute_inner(self) {
60 let Self {
61 schema,
62 mut progress,
63 mut barrier_receiver,
64 mut rows,
65 ..
66 } = self;
67 let barrier = barrier_receiver
68 .recv()
69 .instrument_await("values_executor_recv_first_barrier")
70 .await
71 .unwrap();
72
73 let emit = barrier.is_newly_added(self.ctx.id);
74 let paused_on_startup = barrier.is_pause_on_startup();
75
76 yield Message::Barrier(barrier);
77
78 if emit {
80 if paused_on_startup {
81 while let Some(barrier) = barrier_receiver.recv().await {
83 let is_resume = barrier.is_resume();
84 yield Message::Barrier(barrier);
85 if is_resume {
86 break;
87 }
88 }
89 }
90
91 let cardinality = schema.len();
92 ensure!(cardinality > 0);
93 while !rows.is_empty() {
94 let one_row_chunk = DataChunk::new_dummy(1);
98
99 let chunk_size = DEFAULT_CHUNK_SIZE.min(rows.len());
100 let mut array_builders = schema.create_array_builders(chunk_size);
101 for row in rows.by_ref().take(chunk_size) {
102 for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
103 let out = expr.eval_infallible(&one_row_chunk).await;
104 builder.append_array(&out);
105 }
106 }
107
108 let columns: Vec<_> = array_builders
109 .into_iter()
110 .map(|b| b.finish().into())
111 .collect();
112
113 let chunk = DataChunk::new(columns, chunk_size);
114 let ops = vec![Op::Insert; chunk_size];
115
116 let stream_chunk = StreamChunk::from_parts(ops, chunk);
117 yield Message::Chunk(stream_chunk);
118 }
119 }
120
121 while let Some(barrier) = barrier_receiver.recv().await {
122 if emit {
123 progress.finish(barrier.epoch, 0);
124 }
125 yield Message::Barrier(barrier);
126 }
127 }
128}
129
130impl Execute for ValuesExecutor {
131 fn execute(self: Box<Self>) -> BoxedMessageStream {
132 self.execute_inner().boxed()
133 }
134}
135
136#[cfg(test)]
137mod tests {
138
139 use futures::StreamExt;
140 use risingwave_common::array::{
141 Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
142 };
143 use risingwave_common::catalog::{Field, Schema};
144 use risingwave_common::types::{DataType, ScalarImpl, StructType};
145 use risingwave_common::util::epoch::test_epoch;
146 use risingwave_expr::expr::{BoxedExpression, LiteralExpression, NonStrictExpression};
147 use tokio::sync::mpsc::unbounded_channel;
148
149 use super::ValuesExecutor;
150 use crate::executor::test_utils::StreamExecutorTestExt;
151 use crate::executor::{ActorContext, AddMutation, Barrier, Execute, Mutation};
152 use crate::task::CreateMviewProgressReporter;
153 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
154
155 #[tokio::test]
156 async fn test_values() {
157 let test_env = LocalBarrierTestEnv::for_test().await;
158 let barrier_manager = test_env.local_barrier_manager.clone();
159 let progress = CreateMviewProgressReporter::for_test(barrier_manager);
160 let actor_id = progress.actor_id();
161 let (tx, barrier_receiver) = unbounded_channel();
162 let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
163 let exprs = vec![
164 Box::new(LiteralExpression::new(
165 DataType::Int16,
166 Some(ScalarImpl::Int16(1)),
167 )) as BoxedExpression,
168 Box::new(LiteralExpression::new(
169 DataType::Int32,
170 Some(ScalarImpl::Int32(2)),
171 )),
172 Box::new(LiteralExpression::new(
173 DataType::Int64,
174 Some(ScalarImpl::Int64(3)),
175 )),
176 Box::new(LiteralExpression::new(
177 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
178 Some(ScalarImpl::Struct(value)),
179 )),
180 Box::new(LiteralExpression::new(
181 DataType::Int64,
182 Some(ScalarImpl::Int64(0)),
183 )),
184 ];
185 let schema = exprs
186 .iter() .map(|col| Field::unnamed(col.return_type()))
188 .collect::<Schema>();
189 let values_executor_struct = ValuesExecutor::new(
190 ActorContext::for_test(actor_id),
191 schema,
192 progress,
193 vec![
194 exprs
195 .into_iter()
196 .map(NonStrictExpression::for_test)
197 .collect(),
198 ],
199 barrier_receiver,
200 );
201 let mut values_executor = Box::new(values_executor_struct).execute();
202
203 let first_message =
205 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
206 added_actors: maplit::hashset! {actor_id},
207 ..Default::default()
208 }));
209 tx.send(first_message).unwrap();
210
211 assert!(matches!(
212 values_executor.next_unwrap_ready_barrier().unwrap(),
213 Barrier { .. }
214 ));
215
216 let values_msg = values_executor.next().await.unwrap().unwrap();
218
219 let result = values_msg
220 .into_chunk()
221 .unwrap()
222 .compact_vis()
223 .data_chunk()
224 .to_owned();
225
226 let array: ArrayImpl = StructArray::new(
227 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
228 vec![
229 I32Array::from_iter([1]).into_ref(),
230 I32Array::from_iter([2]).into_ref(),
231 I32Array::from_iter([3]).into_ref(),
232 ],
233 [true].into_iter().collect(),
234 )
235 .into();
236
237 assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
238 assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
239 assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
240 assert_eq!(*result.column_at(3), array.into());
241 assert_eq!(*result.column_at(4), I64Array::from_iter([0]).into_ref());
242
243 tx.send(Barrier::new_test_barrier(test_epoch(2))).unwrap();
245
246 assert!(matches!(
247 values_executor.next_unwrap_ready_barrier().unwrap(),
248 Barrier { .. }
249 ));
250
251 tx.send(Barrier::new_test_barrier(test_epoch(3))).unwrap();
252
253 assert!(matches!(
254 values_executor.next_unwrap_ready_barrier().unwrap(),
255 Barrier { .. }
256 ));
257 }
258}