risingwave_stream/from_proto/
merge.rs1use std::sync::Arc;
16
17use futures::future::try_join_all;
18use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
19
20use super::*;
21use crate::executor::exchange::input::new_input;
22use crate::executor::monitor::StreamingMetrics;
23use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
24use crate::task::LocalBarrierManager;
25
26pub struct MergeExecutorBuilder;
27
28impl MergeExecutorBuilder {
29 pub(crate) async fn new_input(
30 local_barrier_manager: LocalBarrierManager,
31 executor_stats: Arc<StreamingMetrics>,
32 actor_context: ActorContextRef,
33 info: ExecutorInfo,
34 node: &MergeNode,
35 chunk_size: usize,
36 ) -> StreamResult<MergeExecutorInput> {
37 let upstream_fragment_id = node.get_upstream_fragment_id();
38
39 let inputs: Vec<_> = try_join_all(
40 actor_context
41 .initial_upstream_actors
42 .get(&node.upstream_fragment_id)
43 .map(|actors| actors.actors.iter())
44 .into_iter()
45 .flatten()
46 .map(|upstream_actor| {
47 new_input(
48 &local_barrier_manager,
49 executor_stats.clone(),
50 actor_context.id,
51 actor_context.fragment_id,
52 upstream_actor,
53 upstream_fragment_id,
54 actor_context.config.clone(),
55 )
56 }),
57 )
58 .await?;
59
60 let always_single_input = match node.get_upstream_dispatcher_type()? {
63 DispatcherType::Unspecified => unreachable!(),
64 DispatcherType::Hash | DispatcherType::Broadcast => false,
65 DispatcherType::Simple => false,
67 DispatcherType::NoShuffle => true,
69 };
70
71 let upstreams = if always_single_input {
72 MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap())
73 } else {
74 MergeExecutorUpstream::Merge(MergeExecutor::new_merge_upstream(
75 inputs,
76 &executor_stats,
77 &actor_context,
78 chunk_size,
79 info.schema.clone(),
80 ))
81 };
82
83 Ok(MergeExecutorInput::new(
84 upstreams,
85 actor_context,
86 upstream_fragment_id,
87 local_barrier_manager,
88 executor_stats,
89 info,
90 ))
91 }
92}
93
94impl ExecutorBuilder for MergeExecutorBuilder {
95 type Node = MergeNode;
96
97 async fn new_boxed_executor(
98 params: ExecutorParams,
99 node: &Self::Node,
100 _store: impl StateStore,
101 ) -> StreamResult<Executor> {
102 let barrier_rx = params
103 .local_barrier_manager
104 .subscribe_barrier(params.actor_context.id);
105 Ok(Self::new_input(
106 params.local_barrier_manager,
107 params.executor_stats,
108 params.actor_context,
109 params.info,
110 node,
111 params.config.developer.chunk_size,
112 )
113 .await?
114 .into_executor(barrier_rx))
115 }
116}