risingwave_stream/from_proto/
merge.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::future::try_join_all;
19use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
20
21use super::*;
22use crate::executor::exchange::input::new_input;
23use crate::executor::monitor::StreamingMetrics;
24use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
25use crate::task::LocalBarrierManager;
26
27pub struct MergeExecutorBuilder;
28
29impl MergeExecutorBuilder {
30    pub(crate) async fn new_input(
31        local_barrier_manager: LocalBarrierManager,
32        executor_stats: Arc<StreamingMetrics>,
33        actor_context: ActorContextRef,
34        info: ExecutorInfo,
35        node: &MergeNode,
36        chunk_size: usize,
37    ) -> StreamResult<Option<MergeExecutorInput>> {
38        let Some(upstream_actors) = actor_context
39            .initial_upstream_actors
40            .get(&node.upstream_fragment_id)
41        else {
42            return Ok(None);
43        };
44        let upstream_fragment_id = node.get_upstream_fragment_id();
45
46        let inputs: Vec<_> = try_join_all(upstream_actors.actors.iter().map(|upstream_actor| {
47            new_input(
48                &local_barrier_manager,
49                executor_stats.clone(),
50                actor_context.id,
51                actor_context.fragment_id,
52                upstream_actor,
53                upstream_fragment_id,
54                actor_context.config.clone(),
55            )
56        }))
57        .await?;
58
59        // If there's always only one upstream, we can use `ReceiverExecutor`. Note that it can't
60        // scale to multiple upstreams.
61        let always_single_input = match node.get_upstream_dispatcher_type()? {
62            DispatcherType::Unspecified => unreachable!(),
63            DispatcherType::Hash | DispatcherType::Broadcast => false,
64            // There could be arbitrary number of upstreams with simple dispatcher.
65            DispatcherType::Simple => false,
66            // There should be always only one upstream with no-shuffle dispatcher.
67            DispatcherType::NoShuffle => true,
68        };
69
70        let upstreams = if always_single_input {
71            MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap())
72        } else {
73            MergeExecutorUpstream::Merge(MergeExecutor::new_merge_upstream(
74                inputs,
75                &executor_stats,
76                &actor_context,
77                chunk_size,
78                info.schema.clone(),
79            ))
80        };
81
82        Ok(Some(MergeExecutorInput::new(
83            upstreams,
84            actor_context,
85            upstream_fragment_id,
86            local_barrier_manager,
87            executor_stats,
88            info,
89        )))
90    }
91}
92
93impl ExecutorBuilder for MergeExecutorBuilder {
94    type Node = MergeNode;
95
96    async fn new_boxed_executor(
97        params: ExecutorParams,
98        node: &Self::Node,
99        _store: impl StateStore,
100    ) -> StreamResult<Executor> {
101        let actor_id = params.actor_context.id;
102        let fragment_id = params.actor_context.fragment_id;
103        let barrier_rx = params.local_barrier_manager.subscribe_barrier(actor_id);
104        Ok(Self::new_input(
105            params.local_barrier_manager,
106            params.executor_stats,
107            params.actor_context,
108            params.info,
109            node,
110            params.config.developer.chunk_size,
111        )
112        .await?
113        .ok_or_else(|| {
114            anyhow!(
115                "no upstream actors found for actor {} in fragment {}",
116                actor_id,
117                fragment_id
118            )
119        })?
120        .into_executor(barrier_rx))
121    }
122}