risingwave_stream/executor/
values.rs1use std::vec;
16
17use risingwave_common::array::{DataChunk, Op};
18use risingwave_common::ensure;
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_expr::expr::NonStrictExpression;
21use tokio::sync::mpsc::UnboundedReceiver;
22
23use crate::executor::prelude::*;
24use crate::task::CreateMviewProgressReporter;
25
26const DEFAULT_CHUNK_SIZE: usize = 1024;
27
28pub struct ValuesExecutor {
31 ctx: ActorContextRef,
32
33 schema: Schema,
34 barrier_receiver: UnboundedReceiver<Barrier>,
36 progress: CreateMviewProgressReporter,
37
38 rows: vec::IntoIter<Vec<NonStrictExpression>>,
39}
40
41impl ValuesExecutor {
42 pub fn new(
43 ctx: ActorContextRef,
44 schema: Schema,
45 progress: CreateMviewProgressReporter,
46 rows: Vec<Vec<NonStrictExpression>>,
47 barrier_receiver: UnboundedReceiver<Barrier>,
48 ) -> Self {
49 Self {
50 ctx,
51 schema,
52 progress,
53 barrier_receiver,
54 rows: rows.into_iter(),
55 }
56 }
57
58 #[try_stream(ok = Message, error = StreamExecutorError)]
59 async fn execute_inner(self) {
60 let Self {
61 schema,
62 mut progress,
63 mut barrier_receiver,
64 mut rows,
65 ..
66 } = self;
67 let barrier = barrier_receiver
68 .recv()
69 .instrument_await("values_executor_recv_first_barrier")
70 .await
71 .unwrap();
72
73 let emit = barrier.is_newly_added(self.ctx.id);
74 let paused_on_startup = barrier.is_pause_on_startup();
75
76 yield Message::Barrier(barrier);
77
78 if emit {
80 if paused_on_startup {
81 while let Some(barrier) = barrier_receiver.recv().await {
83 let is_resume = barrier.is_resume();
84 yield Message::Barrier(barrier);
85 if is_resume {
86 break;
87 }
88 }
89 }
90
91 let cardinality = schema.len();
92 ensure!(cardinality > 0);
93 while !rows.is_empty() {
94 let one_row_chunk = DataChunk::new_dummy(1);
98
99 let chunk_size = DEFAULT_CHUNK_SIZE.min(rows.len());
100 let mut array_builders = schema.create_array_builders(chunk_size);
101 for row in rows.by_ref().take(chunk_size) {
102 for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
103 let out = expr.eval_infallible(&one_row_chunk).await;
104 builder.append_array(&out);
105 }
106 }
107
108 let columns: Vec<_> = array_builders
109 .into_iter()
110 .map(|b| b.finish().into())
111 .collect();
112
113 let chunk = DataChunk::new(columns, chunk_size);
114 let ops = vec![Op::Insert; chunk_size];
115
116 let stream_chunk = StreamChunk::from_parts(ops, chunk);
117 yield Message::Chunk(stream_chunk);
118 }
119 }
120
121 let mut finish_reported = !emit;
122
123 while let Some(barrier) = barrier_receiver.recv().await {
124 if !finish_reported {
125 progress.finish(barrier.epoch, 0);
126 finish_reported = true;
127 }
128 yield Message::Barrier(barrier);
129 }
130 }
131}
132
133impl Execute for ValuesExecutor {
134 fn execute(self: Box<Self>) -> BoxedMessageStream {
135 self.execute_inner().boxed()
136 }
137}
138
139#[cfg(test)]
140mod tests {
141
142 use futures::StreamExt;
143 use risingwave_common::array::{
144 Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
145 };
146 use risingwave_common::catalog::{Field, Schema};
147 use risingwave_common::types::{DataType, ScalarImpl, StructType};
148 use risingwave_common::util::epoch::test_epoch;
149 use risingwave_expr::expr::{BoxedExpression, LiteralExpression, NonStrictExpression};
150 use tokio::sync::mpsc::unbounded_channel;
151
152 use super::ValuesExecutor;
153 use crate::executor::test_utils::StreamExecutorTestExt;
154 use crate::executor::{ActorContext, AddMutation, Barrier, Execute, Mutation};
155 use crate::task::CreateMviewProgressReporter;
156 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
157
158 #[tokio::test]
159 async fn test_values() {
160 let test_env = LocalBarrierTestEnv::for_test().await;
161 let barrier_manager = test_env.local_barrier_manager.clone();
162 let progress = CreateMviewProgressReporter::for_test(barrier_manager);
163 let actor_id = progress.actor_id();
164 let (tx, barrier_receiver) = unbounded_channel();
165 let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
166 let exprs = vec![
167 Box::new(LiteralExpression::new(
168 DataType::Int16,
169 Some(ScalarImpl::Int16(1)),
170 )) as BoxedExpression,
171 Box::new(LiteralExpression::new(
172 DataType::Int32,
173 Some(ScalarImpl::Int32(2)),
174 )),
175 Box::new(LiteralExpression::new(
176 DataType::Int64,
177 Some(ScalarImpl::Int64(3)),
178 )),
179 Box::new(LiteralExpression::new(
180 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
181 Some(ScalarImpl::Struct(value)),
182 )),
183 Box::new(LiteralExpression::new(
184 DataType::Int64,
185 Some(ScalarImpl::Int64(0)),
186 )),
187 ];
188 let schema = exprs
189 .iter() .map(|col| Field::unnamed(col.return_type()))
191 .collect::<Schema>();
192 let values_executor_struct = ValuesExecutor::new(
193 ActorContext::for_test(actor_id),
194 schema,
195 progress,
196 vec![
197 exprs
198 .into_iter()
199 .map(NonStrictExpression::for_test)
200 .collect(),
201 ],
202 barrier_receiver,
203 );
204 let mut values_executor = Box::new(values_executor_struct).execute();
205
206 let first_message =
208 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
209 added_actors: maplit::hashset! {actor_id},
210 ..Default::default()
211 }));
212 tx.send(first_message).unwrap();
213
214 assert!(matches!(
215 values_executor.next_unwrap_ready_barrier().unwrap(),
216 Barrier { .. }
217 ));
218
219 let values_msg = values_executor.next().await.unwrap().unwrap();
221
222 let result = values_msg
223 .into_chunk()
224 .unwrap()
225 .compact_vis()
226 .data_chunk()
227 .to_owned();
228
229 let array: ArrayImpl = StructArray::new(
230 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
231 vec![
232 I32Array::from_iter([1]).into_ref(),
233 I32Array::from_iter([2]).into_ref(),
234 I32Array::from_iter([3]).into_ref(),
235 ],
236 [true].into_iter().collect(),
237 )
238 .into();
239
240 assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
241 assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
242 assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
243 assert_eq!(*result.column_at(3), array.into());
244 assert_eq!(*result.column_at(4), I64Array::from_iter([0]).into_ref());
245
246 tx.send(Barrier::new_test_barrier(test_epoch(2))).unwrap();
248
249 assert!(matches!(
250 values_executor.next_unwrap_ready_barrier().unwrap(),
251 Barrier { .. }
252 ));
253
254 tx.send(Barrier::new_test_barrier(test_epoch(3))).unwrap();
255
256 assert!(matches!(
257 values_executor.next_unwrap_ready_barrier().unwrap(),
258 Barrier { .. }
259 ));
260 }
261}