risingwave_stream/from_proto/
merge.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
18
19use super::*;
20use crate::executor::exchange::input::new_input;
21use crate::executor::monitor::StreamingMetrics;
22use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
23use crate::task::SharedContext;
24
25pub struct MergeExecutorBuilder;
26
27impl MergeExecutorBuilder {
28    pub(crate) fn new_input(
29        shared_context: Arc<SharedContext>,
30        executor_stats: Arc<StreamingMetrics>,
31        actor_context: ActorContextRef,
32        info: ExecutorInfo,
33        node: &MergeNode,
34        chunk_size: usize,
35    ) -> StreamResult<MergeExecutorInput> {
36        let upstream_fragment_id = node.get_upstream_fragment_id();
37
38        let inputs: Vec<_> = actor_context
39            .initial_upstream_actors
40            .get(&node.upstream_fragment_id)
41            .map(|actors| actors.actors.iter())
42            .into_iter()
43            .flatten()
44            .map(|&upstream_actor_id| {
45                new_input(
46                    &shared_context,
47                    executor_stats.clone(),
48                    actor_context.id,
49                    actor_context.fragment_id,
50                    upstream_actor_id,
51                    upstream_fragment_id,
52                )
53            })
54            .try_collect()?;
55
56        // If there's always only one upstream, we can use `ReceiverExecutor`. Note that it can't
57        // scale to multiple upstreams.
58        let always_single_input = match node.get_upstream_dispatcher_type()? {
59            DispatcherType::Unspecified => unreachable!(),
60            DispatcherType::Hash | DispatcherType::Broadcast => false,
61            // There could be arbitrary number of upstreams with simple dispatcher.
62            DispatcherType::Simple => false,
63            // There should be always only one upstream with no-shuffle dispatcher.
64            DispatcherType::NoShuffle => true,
65        };
66
67        let upstreams = if always_single_input {
68            MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap())
69        } else {
70            MergeExecutorUpstream::Merge(MergeExecutor::new_select_receiver(
71                inputs,
72                &executor_stats,
73                &actor_context,
74            ))
75        };
76        Ok(MergeExecutorInput::new(
77            upstreams,
78            actor_context,
79            upstream_fragment_id,
80            shared_context,
81            executor_stats,
82            info,
83            chunk_size,
84        ))
85    }
86}
87
88impl ExecutorBuilder for MergeExecutorBuilder {
89    type Node = MergeNode;
90
91    async fn new_boxed_executor(
92        params: ExecutorParams,
93        node: &Self::Node,
94        _store: impl StateStore,
95    ) -> StreamResult<Executor> {
96        let barrier_rx = params
97            .local_barrier_manager
98            .subscribe_barrier(params.actor_context.id);
99        Ok(Self::new_input(
100            params.shared_context,
101            params.executor_stats,
102            params.actor_context,
103            params.info,
104            node,
105            params.env.config().developer.chunk_size,
106        )?
107        .into_executor(barrier_rx))
108    }
109}