risingwave_stream/executor/
values.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::vec;
16
17use risingwave_common::array::{DataChunk, Op};
18use risingwave_common::ensure;
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_expr::expr::NonStrictExpression;
21use tokio::sync::mpsc::UnboundedReceiver;
22
23use crate::executor::prelude::*;
24use crate::task::CreateMviewProgressReporter;
25
26const DEFAULT_CHUNK_SIZE: usize = 1024;
27
28/// Execute `values` in stream. As is a leaf, current workaround holds a `barrier_executor`.
29/// May refractor with `BarrierRecvExecutor` in the near future.
30pub struct ValuesExecutor {
31    ctx: ActorContextRef,
32
33    schema: Schema,
34    // Receiver of barrier channel.
35    barrier_receiver: UnboundedReceiver<Barrier>,
36    progress: CreateMviewProgressReporter,
37
38    rows: vec::IntoIter<Vec<NonStrictExpression>>,
39}
40
41impl ValuesExecutor {
42    /// Currently hard-code the `pk_indices` as the last column.
43    pub fn new(
44        ctx: ActorContextRef,
45        schema: Schema,
46        progress: CreateMviewProgressReporter,
47        rows: Vec<Vec<NonStrictExpression>>,
48        barrier_receiver: UnboundedReceiver<Barrier>,
49    ) -> Self {
50        Self {
51            ctx,
52            schema,
53            progress,
54            barrier_receiver,
55            rows: rows.into_iter(),
56        }
57    }
58
59    #[try_stream(ok = Message, error = StreamExecutorError)]
60    async fn execute_inner(self) {
61        let Self {
62            schema,
63            mut progress,
64            mut barrier_receiver,
65            mut rows,
66            ..
67        } = self;
68        let barrier = barrier_receiver
69            .recv()
70            .instrument_await("values_executor_recv_first_barrier")
71            .await
72            .unwrap();
73
74        let emit = barrier.is_newly_added(self.ctx.id);
75        let paused_on_startup = barrier.is_pause_on_startup();
76
77        yield Message::Barrier(barrier);
78
79        // If it's failover, do not evaluate rows (assume they have been yielded)
80        if emit {
81            if paused_on_startup {
82                // Wait for the data stream to be resumed before yielding the chunks.
83                while let Some(barrier) = barrier_receiver.recv().await {
84                    let is_resume = barrier.is_resume();
85                    yield Message::Barrier(barrier);
86                    if is_resume {
87                        break;
88                    }
89                }
90            }
91
92            let cardinality = schema.len();
93            ensure!(cardinality > 0);
94            while !rows.is_empty() {
95                // We need a one row chunk rather than an empty chunk because constant
96                // expression's eval result is same size as input chunk
97                // cardinality.
98                let one_row_chunk = DataChunk::new_dummy(1);
99
100                let chunk_size = DEFAULT_CHUNK_SIZE.min(rows.len());
101                let mut array_builders = schema.create_array_builders(chunk_size);
102                for row in rows.by_ref().take(chunk_size) {
103                    for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
104                        let out = expr.eval_infallible(&one_row_chunk).await;
105                        builder.append_array(&out);
106                    }
107                }
108
109                let columns: Vec<_> = array_builders
110                    .into_iter()
111                    .map(|b| b.finish().into())
112                    .collect();
113
114                let chunk = DataChunk::new(columns, chunk_size);
115                let ops = vec![Op::Insert; chunk_size];
116
117                let stream_chunk = StreamChunk::from_parts(ops, chunk);
118                yield Message::Chunk(stream_chunk);
119            }
120        }
121
122        while let Some(barrier) = barrier_receiver.recv().await {
123            if emit {
124                progress.finish(barrier.epoch, 0);
125            }
126            yield Message::Barrier(barrier);
127        }
128    }
129}
130
131impl Execute for ValuesExecutor {
132    fn execute(self: Box<Self>) -> BoxedMessageStream {
133        self.execute_inner().boxed()
134    }
135}
136
137#[cfg(test)]
138mod tests {
139
140    use futures::StreamExt;
141    use risingwave_common::array::{
142        Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
143    };
144    use risingwave_common::catalog::{Field, Schema};
145    use risingwave_common::types::{DataType, ScalarImpl, StructType};
146    use risingwave_common::util::epoch::test_epoch;
147    use risingwave_expr::expr::{BoxedExpression, LiteralExpression, NonStrictExpression};
148    use tokio::sync::mpsc::unbounded_channel;
149
150    use super::ValuesExecutor;
151    use crate::executor::test_utils::StreamExecutorTestExt;
152    use crate::executor::{ActorContext, AddMutation, Barrier, Execute, Mutation};
153    use crate::task::CreateMviewProgressReporter;
154    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
155
156    #[tokio::test]
157    async fn test_values() {
158        let test_env = LocalBarrierTestEnv::for_test().await;
159        let barrier_manager = test_env.local_barrier_manager.clone();
160        let progress = CreateMviewProgressReporter::for_test(barrier_manager);
161        let actor_id = progress.actor_id();
162        let (tx, barrier_receiver) = unbounded_channel();
163        let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
164        let exprs = vec![
165            Box::new(LiteralExpression::new(
166                DataType::Int16,
167                Some(ScalarImpl::Int16(1)),
168            )) as BoxedExpression,
169            Box::new(LiteralExpression::new(
170                DataType::Int32,
171                Some(ScalarImpl::Int32(2)),
172            )),
173            Box::new(LiteralExpression::new(
174                DataType::Int64,
175                Some(ScalarImpl::Int64(3)),
176            )),
177            Box::new(LiteralExpression::new(
178                StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
179                Some(ScalarImpl::Struct(value)),
180            )),
181            Box::new(LiteralExpression::new(
182                DataType::Int64,
183                Some(ScalarImpl::Int64(0)),
184            )),
185        ];
186        let schema = exprs
187            .iter() // for each column
188            .map(|col| Field::unnamed(col.return_type()))
189            .collect::<Schema>();
190        let values_executor_struct = ValuesExecutor::new(
191            ActorContext::for_test(actor_id),
192            schema,
193            progress,
194            vec![
195                exprs
196                    .into_iter()
197                    .map(NonStrictExpression::for_test)
198                    .collect(),
199            ],
200            barrier_receiver,
201        );
202        let mut values_executor = Box::new(values_executor_struct).execute();
203
204        // Init barrier
205        let first_message =
206            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
207                adds: Default::default(),
208                added_actors: maplit::hashset! {actor_id},
209                splits: Default::default(),
210                pause: false,
211                subscriptions_to_add: vec![],
212                backfill_nodes_to_pause: Default::default(),
213            }));
214        tx.send(first_message).unwrap();
215
216        assert!(matches!(
217            values_executor.next_unwrap_ready_barrier().unwrap(),
218            Barrier { .. }
219        ));
220
221        // Consume the barrier
222        let values_msg = values_executor.next().await.unwrap().unwrap();
223
224        let result = values_msg
225            .into_chunk()
226            .unwrap()
227            .compact()
228            .data_chunk()
229            .to_owned();
230
231        let array: ArrayImpl = StructArray::new(
232            StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
233            vec![
234                I32Array::from_iter([1]).into_ref(),
235                I32Array::from_iter([2]).into_ref(),
236                I32Array::from_iter([3]).into_ref(),
237            ],
238            [true].into_iter().collect(),
239        )
240        .into();
241
242        assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
243        assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
244        assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
245        assert_eq!(*result.column_at(3), array.into());
246        assert_eq!(*result.column_at(4), I64Array::from_iter([0]).into_ref());
247
248        // ValueExecutor should simply forward following barriers
249        tx.send(Barrier::new_test_barrier(test_epoch(2))).unwrap();
250
251        assert!(matches!(
252            values_executor.next_unwrap_ready_barrier().unwrap(),
253            Barrier { .. }
254        ));
255
256        tx.send(Barrier::new_test_barrier(test_epoch(3))).unwrap();
257
258        assert!(matches!(
259            values_executor.next_unwrap_ready_barrier().unwrap(),
260            Barrier { .. }
261        ));
262    }
263}