risingwave_stream/from_proto/
merge.rs1use std::sync::Arc;
16
17use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
18
19use super::*;
20use crate::executor::exchange::input::new_input;
21use crate::executor::monitor::StreamingMetrics;
22use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
23use crate::task::SharedContext;
24
25pub struct MergeExecutorBuilder;
26
27impl MergeExecutorBuilder {
28 pub(crate) fn new_input(
29 shared_context: Arc<SharedContext>,
30 executor_stats: Arc<StreamingMetrics>,
31 actor_context: ActorContextRef,
32 info: ExecutorInfo,
33 node: &MergeNode,
34 chunk_size: usize,
35 ) -> StreamResult<MergeExecutorInput> {
36 let upstream_fragment_id = node.get_upstream_fragment_id();
37
38 let inputs: Vec<_> = actor_context
39 .initial_upstream_actors
40 .get(&node.upstream_fragment_id)
41 .map(|actors| actors.actors.iter())
42 .into_iter()
43 .flatten()
44 .map(|&upstream_actor_id| {
45 new_input(
46 &shared_context,
47 executor_stats.clone(),
48 actor_context.id,
49 actor_context.fragment_id,
50 upstream_actor_id,
51 upstream_fragment_id,
52 )
53 })
54 .try_collect()?;
55
56 let always_single_input = match node.get_upstream_dispatcher_type()? {
59 DispatcherType::Unspecified => unreachable!(),
60 DispatcherType::Hash | DispatcherType::Broadcast => false,
61 DispatcherType::Simple => false,
63 DispatcherType::NoShuffle => true,
65 };
66
67 let upstreams = if always_single_input {
68 MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap())
69 } else {
70 MergeExecutorUpstream::Merge(MergeExecutor::new_select_receiver(
71 inputs,
72 &executor_stats,
73 &actor_context,
74 ))
75 };
76 Ok(MergeExecutorInput::new(
77 upstreams,
78 actor_context,
79 upstream_fragment_id,
80 shared_context,
81 executor_stats,
82 info,
83 chunk_size,
84 ))
85 }
86}
87
88impl ExecutorBuilder for MergeExecutorBuilder {
89 type Node = MergeNode;
90
91 async fn new_boxed_executor(
92 params: ExecutorParams,
93 node: &Self::Node,
94 _store: impl StateStore,
95 ) -> StreamResult<Executor> {
96 let barrier_rx = params
97 .local_barrier_manager
98 .subscribe_barrier(params.actor_context.id);
99 Ok(Self::new_input(
100 params.shared_context,
101 params.executor_stats,
102 params.actor_context,
103 params.info,
104 node,
105 params.env.config().developer.chunk_size,
106 )?
107 .into_executor(barrier_rx))
108 }
109}