risingwave_stream/from_proto/
merge.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::try_join_all;
18use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
19
20use super::*;
21use crate::executor::exchange::input::new_input;
22use crate::executor::monitor::StreamingMetrics;
23use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
24use crate::task::LocalBarrierManager;
25
26pub struct MergeExecutorBuilder;
27
28impl MergeExecutorBuilder {
29    pub(crate) async fn new_input(
30        local_barrier_manager: LocalBarrierManager,
31        executor_stats: Arc<StreamingMetrics>,
32        actor_context: ActorContextRef,
33        info: ExecutorInfo,
34        node: &MergeNode,
35        chunk_size: usize,
36    ) -> StreamResult<MergeExecutorInput> {
37        let upstream_fragment_id = node.get_upstream_fragment_id();
38
39        let inputs: Vec<_> = try_join_all(
40            actor_context
41                .initial_upstream_actors
42                .get(&node.upstream_fragment_id)
43                .map(|actors| actors.actors.iter())
44                .into_iter()
45                .flatten()
46                .map(|upstream_actor| {
47                    new_input(
48                        &local_barrier_manager,
49                        executor_stats.clone(),
50                        actor_context.id,
51                        actor_context.fragment_id,
52                        upstream_actor,
53                        upstream_fragment_id,
54                    )
55                }),
56        )
57        .await?;
58
59        // If there's always only one upstream, we can use `ReceiverExecutor`. Note that it can't
60        // scale to multiple upstreams.
61        let always_single_input = match node.get_upstream_dispatcher_type()? {
62            DispatcherType::Unspecified => unreachable!(),
63            DispatcherType::Hash | DispatcherType::Broadcast => false,
64            // There could be arbitrary number of upstreams with simple dispatcher.
65            DispatcherType::Simple => false,
66            // There should be always only one upstream with no-shuffle dispatcher.
67            DispatcherType::NoShuffle => true,
68        };
69
70        let upstreams = if always_single_input {
71            MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap())
72        } else {
73            MergeExecutorUpstream::Merge(MergeExecutor::new_select_receiver(
74                inputs,
75                &executor_stats,
76                &actor_context,
77            ))
78        };
79        Ok(MergeExecutorInput::new(
80            upstreams,
81            actor_context,
82            upstream_fragment_id,
83            local_barrier_manager,
84            executor_stats,
85            info,
86            chunk_size,
87        ))
88    }
89}
90
91impl ExecutorBuilder for MergeExecutorBuilder {
92    type Node = MergeNode;
93
94    async fn new_boxed_executor(
95        params: ExecutorParams,
96        node: &Self::Node,
97        _store: impl StateStore,
98    ) -> StreamResult<Executor> {
99        let barrier_rx = params
100            .local_barrier_manager
101            .subscribe_barrier(params.actor_context.id);
102        Ok(Self::new_input(
103            params.local_barrier_manager,
104            params.executor_stats,
105            params.actor_context,
106            params.info,
107            node,
108            params.env.config().developer.chunk_size,
109        )
110        .await?
111        .into_executor(barrier_rx))
112    }
113}