risingwave_stream/from_proto/
merge.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::try_join_all;
18use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
19
20use super::*;
21use crate::executor::exchange::input::new_input;
22use crate::executor::monitor::StreamingMetrics;
23use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
24use crate::task::LocalBarrierManager;
25
26pub struct MergeExecutorBuilder;
27
28impl MergeExecutorBuilder {
29    pub(crate) async fn new_input(
30        local_barrier_manager: LocalBarrierManager,
31        executor_stats: Arc<StreamingMetrics>,
32        actor_context: ActorContextRef,
33        info: ExecutorInfo,
34        node: &MergeNode,
35        chunk_size: usize,
36    ) -> StreamResult<MergeExecutorInput> {
37        let upstream_fragment_id = node.get_upstream_fragment_id();
38
39        let inputs: Vec<_> = try_join_all(
40            actor_context
41                .initial_upstream_actors
42                .get(&node.upstream_fragment_id)
43                .map(|actors| actors.actors.iter())
44                .into_iter()
45                .flatten()
46                .map(|upstream_actor| {
47                    new_input(
48                        &local_barrier_manager,
49                        executor_stats.clone(),
50                        actor_context.id,
51                        actor_context.fragment_id,
52                        upstream_actor,
53                        upstream_fragment_id,
54                        actor_context.config.clone(),
55                    )
56                }),
57        )
58        .await?;
59
60        // If there's always only one upstream, we can use `ReceiverExecutor`. Note that it can't
61        // scale to multiple upstreams.
62        let always_single_input = match node.get_upstream_dispatcher_type()? {
63            DispatcherType::Unspecified => unreachable!(),
64            DispatcherType::Hash | DispatcherType::Broadcast => false,
65            // There could be arbitrary number of upstreams with simple dispatcher.
66            DispatcherType::Simple => false,
67            // There should be always only one upstream with no-shuffle dispatcher.
68            DispatcherType::NoShuffle => true,
69        };
70
71        let upstreams = if always_single_input {
72            MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap())
73        } else {
74            MergeExecutorUpstream::Merge(MergeExecutor::new_merge_upstream(
75                inputs,
76                &executor_stats,
77                &actor_context,
78                chunk_size,
79                info.schema.clone(),
80            ))
81        };
82
83        Ok(MergeExecutorInput::new(
84            upstreams,
85            actor_context,
86            upstream_fragment_id,
87            local_barrier_manager,
88            executor_stats,
89            info,
90        ))
91    }
92}
93
94impl ExecutorBuilder for MergeExecutorBuilder {
95    type Node = MergeNode;
96
97    async fn new_boxed_executor(
98        params: ExecutorParams,
99        node: &Self::Node,
100        _store: impl StateStore,
101    ) -> StreamResult<Executor> {
102        let barrier_rx = params
103            .local_barrier_manager
104            .subscribe_barrier(params.actor_context.id);
105        Ok(Self::new_input(
106            params.local_barrier_manager,
107            params.executor_stats,
108            params.actor_context,
109            params.info,
110            node,
111            params.config.developer.chunk_size,
112        )
113        .await?
114        .into_executor(barrier_rx))
115    }
116}