Skip to main content

risingwave_stream/from_proto/
merge.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::future::try_join_all;
19use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
20
21use super::*;
22use crate::executor::exchange::input::new_input;
23use crate::executor::monitor::StreamingMetrics;
24use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
25use crate::task::LocalBarrierManager;
26
27pub struct MergeExecutorBuilder;
28
29impl MergeExecutorBuilder {
30    pub(crate) async fn new_input(
31        local_barrier_manager: LocalBarrierManager,
32        executor_stats: Arc<StreamingMetrics>,
33        actor_context: ActorContextRef,
34        info: ExecutorInfo,
35        node: &MergeNode,
36        chunk_size: usize,
37    ) -> StreamResult<Option<MergeExecutorInput>> {
38        let Some(upstream_actors) = actor_context
39            .initial_upstream_actors
40            .get(&node.upstream_fragment_id)
41        else {
42            return Ok(None);
43        };
44        let upstream_fragment_id = node.get_upstream_fragment_id();
45
46        let inputs: Vec<_> = try_join_all(upstream_actors.actors.iter().map(|upstream_actor| {
47            new_input(
48                &local_barrier_manager,
49                executor_stats.clone(),
50                actor_context.id,
51                actor_context.fragment_id,
52                upstream_actor,
53                upstream_fragment_id,
54                actor_context.config.clone(),
55            )
56        }))
57        .await?;
58
59        // If there's always only one upstream, we can use `ReceiverExecutor`. Note that it can't
60        // scale to multiple upstreams.
61        let always_single_input = match node.get_upstream_dispatcher_type()? {
62            DispatcherType::Unspecified => unreachable!(),
63            DispatcherType::Hash | DispatcherType::Broadcast => false,
64            // There could be arbitrary number of upstreams with simple dispatcher.
65            DispatcherType::Simple => false,
66            // There should be always only one upstream with no-shuffle dispatcher.
67            DispatcherType::NoShuffle => true,
68        };
69
70        let upstreams = if always_single_input {
71            MergeExecutorUpstream::Singleton(Itertools::exactly_one(inputs.into_iter()).unwrap())
72        } else {
73            MergeExecutorUpstream::Merge(MergeExecutor::new_merge_upstream(
74                inputs,
75                &executor_stats,
76                &actor_context,
77                chunk_size,
78                info.schema.clone(),
79            ))
80        };
81
82        Ok(Some(MergeExecutorInput::new(
83            upstreams,
84            actor_context,
85            upstream_fragment_id,
86            local_barrier_manager,
87            executor_stats,
88            info,
89        )))
90    }
91}
92
93impl_stream_node_body!(Merge(MergeNode) => MergeExecutorBuilder);
94
95impl ExecutorBuilder for MergeExecutorBuilder {
96    type Node = MergeNode;
97
98    async fn new_boxed_executor(
99        params: ExecutorParams,
100        node: &Self::Node,
101        _store: impl StateStore,
102    ) -> StreamResult<Executor> {
103        let actor_id = params.actor_context.id;
104        let fragment_id = params.actor_context.fragment_id;
105        let barrier_rx = params.local_barrier_manager.subscribe_barrier(actor_id);
106        Ok(Self::new_input(
107            params.local_barrier_manager,
108            params.executor_stats,
109            params.actor_context,
110            params.info,
111            node,
112            params.config.developer.chunk_size,
113        )
114        .await?
115        .ok_or_else(|| {
116            anyhow!(
117                "no upstream actors found for actor {} in fragment {}",
118                actor_id,
119                fragment_id
120            )
121        })?
122        .into_executor(barrier_rx))
123    }
124}