risingwave_stream/executor/
values.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::vec;
16
17use risingwave_common::array::{DataChunk, Op};
18use risingwave_common::ensure;
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_expr::expr::NonStrictExpression;
21use tokio::sync::mpsc::UnboundedReceiver;
22
23use crate::executor::prelude::*;
24use crate::task::CreateMviewProgressReporter;
25
26const DEFAULT_CHUNK_SIZE: usize = 1024;
27
28/// Execute `values` in stream. As is a leaf, current workaround holds a `barrier_executor`.
29/// May refactor with `BarrierRecvExecutor` in the near future.
30pub struct ValuesExecutor {
31    ctx: ActorContextRef,
32
33    schema: Schema,
34    // Receiver of barrier channel.
35    barrier_receiver: UnboundedReceiver<Barrier>,
36    progress: CreateMviewProgressReporter,
37
38    rows: vec::IntoIter<Vec<NonStrictExpression>>,
39}
40
41impl ValuesExecutor {
42    pub fn new(
43        ctx: ActorContextRef,
44        schema: Schema,
45        progress: CreateMviewProgressReporter,
46        rows: Vec<Vec<NonStrictExpression>>,
47        barrier_receiver: UnboundedReceiver<Barrier>,
48    ) -> Self {
49        Self {
50            ctx,
51            schema,
52            progress,
53            barrier_receiver,
54            rows: rows.into_iter(),
55        }
56    }
57
58    #[try_stream(ok = Message, error = StreamExecutorError)]
59    async fn execute_inner(self) {
60        let Self {
61            schema,
62            mut progress,
63            mut barrier_receiver,
64            mut rows,
65            ..
66        } = self;
67        let barrier = barrier_receiver
68            .recv()
69            .instrument_await("values_executor_recv_first_barrier")
70            .await
71            .unwrap();
72
73        let emit = barrier.is_newly_added(self.ctx.id);
74        let paused_on_startup = barrier.is_pause_on_startup();
75
76        yield Message::Barrier(barrier);
77
78        // If it's failover, do not evaluate rows (assume they have been yielded)
79        if emit {
80            if paused_on_startup {
81                // Wait for the data stream to be resumed before yielding the chunks.
82                while let Some(barrier) = barrier_receiver.recv().await {
83                    let is_resume = barrier.is_resume();
84                    yield Message::Barrier(barrier);
85                    if is_resume {
86                        break;
87                    }
88                }
89            }
90
91            let cardinality = schema.len();
92            ensure!(cardinality > 0);
93            while !rows.is_empty() {
94                // We need a one row chunk rather than an empty chunk because constant
95                // expression's eval result is same size as input chunk
96                // cardinality.
97                let one_row_chunk = DataChunk::new_dummy(1);
98
99                let chunk_size = DEFAULT_CHUNK_SIZE.min(rows.len());
100                let mut array_builders = schema.create_array_builders(chunk_size);
101                for row in rows.by_ref().take(chunk_size) {
102                    for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
103                        let out = expr.eval_infallible(&one_row_chunk).await;
104                        builder.append_array(&out);
105                    }
106                }
107
108                let columns: Vec<_> = array_builders
109                    .into_iter()
110                    .map(|b| b.finish().into())
111                    .collect();
112
113                let chunk = DataChunk::new(columns, chunk_size);
114                let ops = vec![Op::Insert; chunk_size];
115
116                let stream_chunk = StreamChunk::from_parts(ops, chunk);
117                yield Message::Chunk(stream_chunk);
118            }
119        }
120
121        let mut finish_reported = !emit;
122
123        while let Some(barrier) = barrier_receiver.recv().await {
124            if !finish_reported {
125                progress.finish(barrier.epoch, 0);
126                finish_reported = true;
127            }
128            yield Message::Barrier(barrier);
129        }
130    }
131}
132
133impl Execute for ValuesExecutor {
134    fn execute(self: Box<Self>) -> BoxedMessageStream {
135        self.execute_inner().boxed()
136    }
137}
138
139#[cfg(test)]
140mod tests {
141
142    use futures::StreamExt;
143    use risingwave_common::array::{
144        Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
145    };
146    use risingwave_common::catalog::{Field, Schema};
147    use risingwave_common::types::{DataType, ScalarImpl, StructType};
148    use risingwave_common::util::epoch::test_epoch;
149    use risingwave_expr::expr::{BoxedExpression, LiteralExpression, NonStrictExpression};
150    use tokio::sync::mpsc::unbounded_channel;
151
152    use super::ValuesExecutor;
153    use crate::executor::test_utils::StreamExecutorTestExt;
154    use crate::executor::{ActorContext, AddMutation, Barrier, Execute, Mutation};
155    use crate::task::CreateMviewProgressReporter;
156    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
157
158    #[tokio::test]
159    async fn test_values() {
160        let test_env = LocalBarrierTestEnv::for_test().await;
161        let barrier_manager = test_env.local_barrier_manager.clone();
162        let progress = CreateMviewProgressReporter::for_test(barrier_manager);
163        let actor_id = progress.actor_id();
164        let (tx, barrier_receiver) = unbounded_channel();
165        let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
166        let exprs = vec![
167            Box::new(LiteralExpression::new(
168                DataType::Int16,
169                Some(ScalarImpl::Int16(1)),
170            )) as BoxedExpression,
171            Box::new(LiteralExpression::new(
172                DataType::Int32,
173                Some(ScalarImpl::Int32(2)),
174            )),
175            Box::new(LiteralExpression::new(
176                DataType::Int64,
177                Some(ScalarImpl::Int64(3)),
178            )),
179            Box::new(LiteralExpression::new(
180                StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
181                Some(ScalarImpl::Struct(value)),
182            )),
183            Box::new(LiteralExpression::new(
184                DataType::Int64,
185                Some(ScalarImpl::Int64(0)),
186            )),
187        ];
188        let schema = exprs
189            .iter() // for each column
190            .map(|col| Field::unnamed(col.return_type()))
191            .collect::<Schema>();
192        let values_executor_struct = ValuesExecutor::new(
193            ActorContext::for_test(actor_id),
194            schema,
195            progress,
196            vec![
197                exprs
198                    .into_iter()
199                    .map(NonStrictExpression::for_test)
200                    .collect(),
201            ],
202            barrier_receiver,
203        );
204        let mut values_executor = Box::new(values_executor_struct).execute();
205
206        // Init barrier
207        let first_message =
208            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
209                added_actors: maplit::hashset! {actor_id},
210                ..Default::default()
211            }));
212        tx.send(first_message).unwrap();
213
214        assert!(matches!(
215            values_executor.next_unwrap_ready_barrier().unwrap(),
216            Barrier { .. }
217        ));
218
219        // Consume the barrier
220        let values_msg = values_executor.next().await.unwrap().unwrap();
221
222        let result = values_msg
223            .into_chunk()
224            .unwrap()
225            .compact_vis()
226            .data_chunk()
227            .to_owned();
228
229        let array: ArrayImpl = StructArray::new(
230            StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
231            vec![
232                I32Array::from_iter([1]).into_ref(),
233                I32Array::from_iter([2]).into_ref(),
234                I32Array::from_iter([3]).into_ref(),
235            ],
236            [true].into_iter().collect(),
237        )
238        .into();
239
240        assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
241        assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
242        assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
243        assert_eq!(*result.column_at(3), array.into());
244        assert_eq!(*result.column_at(4), I64Array::from_iter([0]).into_ref());
245
246        // ValueExecutor should simply forward following barriers
247        tx.send(Barrier::new_test_barrier(test_epoch(2))).unwrap();
248
249        assert!(matches!(
250            values_executor.next_unwrap_ready_barrier().unwrap(),
251            Barrier { .. }
252        ));
253
254        tx.send(Barrier::new_test_barrier(test_epoch(3))).unwrap();
255
256        assert!(matches!(
257            values_executor.next_unwrap_ready_barrier().unwrap(),
258            Barrier { .. }
259        ));
260    }
261}