risingwave_stream/from_proto/
merge.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::future::try_join_all;
19use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
20
21use super::*;
22use crate::executor::exchange::input::new_input;
23use crate::executor::monitor::StreamingMetrics;
24use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
25use crate::task::LocalBarrierManager;
26
27pub struct MergeExecutorBuilder;
28
29impl MergeExecutorBuilder {
30 pub(crate) async fn new_input(
31 local_barrier_manager: LocalBarrierManager,
32 executor_stats: Arc<StreamingMetrics>,
33 actor_context: ActorContextRef,
34 info: ExecutorInfo,
35 node: &MergeNode,
36 chunk_size: usize,
37 ) -> StreamResult<Option<MergeExecutorInput>> {
38 let Some(upstream_actors) = actor_context
39 .initial_upstream_actors
40 .get(&node.upstream_fragment_id)
41 else {
42 return Ok(None);
43 };
44 let upstream_fragment_id = node.get_upstream_fragment_id();
45
46 let inputs: Vec<_> = try_join_all(upstream_actors.actors.iter().map(|upstream_actor| {
47 new_input(
48 &local_barrier_manager,
49 executor_stats.clone(),
50 actor_context.id,
51 actor_context.fragment_id,
52 upstream_actor,
53 upstream_fragment_id,
54 actor_context.config.clone(),
55 )
56 }))
57 .await?;
58
59 let always_single_input = match node.get_upstream_dispatcher_type()? {
62 DispatcherType::Unspecified => unreachable!(),
63 DispatcherType::Hash | DispatcherType::Broadcast => false,
64 DispatcherType::Simple => false,
66 DispatcherType::NoShuffle => true,
68 };
69
70 let upstreams = if always_single_input {
71 MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap())
72 } else {
73 MergeExecutorUpstream::Merge(MergeExecutor::new_merge_upstream(
74 inputs,
75 &executor_stats,
76 &actor_context,
77 chunk_size,
78 info.schema.clone(),
79 ))
80 };
81
82 Ok(Some(MergeExecutorInput::new(
83 upstreams,
84 actor_context,
85 upstream_fragment_id,
86 local_barrier_manager,
87 executor_stats,
88 info,
89 )))
90 }
91}
92
93impl ExecutorBuilder for MergeExecutorBuilder {
94 type Node = MergeNode;
95
96 async fn new_boxed_executor(
97 params: ExecutorParams,
98 node: &Self::Node,
99 _store: impl StateStore,
100 ) -> StreamResult<Executor> {
101 let actor_id = params.actor_context.id;
102 let fragment_id = params.actor_context.fragment_id;
103 let barrier_rx = params.local_barrier_manager.subscribe_barrier(actor_id);
104 Ok(Self::new_input(
105 params.local_barrier_manager,
106 params.executor_stats,
107 params.actor_context,
108 params.info,
109 node,
110 params.config.developer.chunk_size,
111 )
112 .await?
113 .ok_or_else(|| {
114 anyhow!(
115 "no upstream actors found for actor {} in fragment {}",
116 actor_id,
117 fragment_id
118 )
119 })?
120 .into_executor(barrier_rx))
121 }
122}