risingwave_stream/from_proto/
merge.rsuse std::sync::Arc;
use risingwave_pb::stream_plan::{DispatcherType, MergeNode};
use super::*;
use crate::executor::exchange::input::new_input;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream};
use crate::task::SharedContext;
pub struct MergeExecutorBuilder;
impl MergeExecutorBuilder {
pub(crate) fn new_input(
shared_context: Arc<SharedContext>,
executor_stats: Arc<StreamingMetrics>,
actor_context: ActorContextRef,
info: ExecutorInfo,
node: &MergeNode,
chunk_size: usize,
) -> StreamResult<MergeExecutorInput> {
let upstreams = node.get_upstream_actor_id();
let upstream_fragment_id = node.get_upstream_fragment_id();
let inputs: Vec<_> = upstreams
.iter()
.map(|&upstream_actor_id| {
new_input(
&shared_context,
executor_stats.clone(),
actor_context.id,
actor_context.fragment_id,
upstream_actor_id,
upstream_fragment_id,
)
})
.try_collect()?;
let always_single_input = match node.get_upstream_dispatcher_type()? {
DispatcherType::Unspecified => unreachable!(),
DispatcherType::Hash | DispatcherType::Broadcast => false,
DispatcherType::Simple => false,
DispatcherType::NoShuffle => true,
};
let upstreams = if always_single_input {
MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap())
} else {
MergeExecutorUpstream::Merge(MergeExecutor::new_select_receiver(
inputs,
&executor_stats,
&actor_context,
))
};
Ok(MergeExecutorInput::new(
upstreams,
actor_context,
upstream_fragment_id,
shared_context,
executor_stats,
info,
chunk_size,
))
}
}
impl ExecutorBuilder for MergeExecutorBuilder {
type Node = MergeNode;
async fn new_boxed_executor(
params: ExecutorParams,
node: &Self::Node,
_store: impl StateStore,
) -> StreamResult<Executor> {
let barrier_rx = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);
Ok(Self::new_input(
params.shared_context,
params.executor_stats,
params.actor_context,
params.info,
node,
params.env.config().developer.chunk_size,
)?
.into_executor(barrier_rx))
}
}