risingwave_stream/from_proto/
merge.rs1use std::sync::Arc;
16
17use futures::future::try_join_all;
18use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
19
20use super::*;
21use crate::executor::exchange::input::new_input;
22use crate::executor::monitor::StreamingMetrics;
23use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
24use crate::task::LocalBarrierManager;
25
26pub struct MergeExecutorBuilder;
27
28impl MergeExecutorBuilder {
29 pub(crate) async fn new_input(
30 local_barrier_manager: LocalBarrierManager,
31 executor_stats: Arc<StreamingMetrics>,
32 actor_context: ActorContextRef,
33 info: ExecutorInfo,
34 node: &MergeNode,
35 chunk_size: usize,
36 ) -> StreamResult<MergeExecutorInput> {
37 let upstream_fragment_id = node.get_upstream_fragment_id();
38
39 let inputs: Vec<_> = try_join_all(
40 actor_context
41 .initial_upstream_actors
42 .get(&node.upstream_fragment_id)
43 .map(|actors| actors.actors.iter())
44 .into_iter()
45 .flatten()
46 .map(|upstream_actor| {
47 new_input(
48 &local_barrier_manager,
49 executor_stats.clone(),
50 actor_context.id,
51 actor_context.fragment_id,
52 upstream_actor,
53 upstream_fragment_id,
54 )
55 }),
56 )
57 .await?;
58
59 let always_single_input = match node.get_upstream_dispatcher_type()? {
62 DispatcherType::Unspecified => unreachable!(),
63 DispatcherType::Hash | DispatcherType::Broadcast => false,
64 DispatcherType::Simple => false,
66 DispatcherType::NoShuffle => true,
68 };
69
70 let upstreams = if always_single_input {
71 MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap())
72 } else {
73 MergeExecutorUpstream::Merge(MergeExecutor::new_select_receiver(
74 inputs,
75 &executor_stats,
76 &actor_context,
77 ))
78 };
79 Ok(MergeExecutorInput::new(
80 upstreams,
81 actor_context,
82 upstream_fragment_id,
83 local_barrier_manager,
84 executor_stats,
85 info,
86 chunk_size,
87 ))
88 }
89}
90
91impl ExecutorBuilder for MergeExecutorBuilder {
92 type Node = MergeNode;
93
94 async fn new_boxed_executor(
95 params: ExecutorParams,
96 node: &Self::Node,
97 _store: impl StateStore,
98 ) -> StreamResult<Executor> {
99 let barrier_rx = params
100 .local_barrier_manager
101 .subscribe_barrier(params.actor_context.id);
102 Ok(Self::new_input(
103 params.local_barrier_manager,
104 params.executor_stats,
105 params.actor_context,
106 params.info,
107 node,
108 params.env.config().developer.chunk_size,
109 )
110 .await?
111 .into_executor(barrier_rx))
112 }
113}