risingwave_stream/from_proto/
upstream_sink_union.rs1use risingwave_pb::stream_plan::UpstreamSinkUnionNode;
16
17use super::*;
18use crate::executor::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
19
20pub struct UpstreamSinkUnionExecutorBuilder;
21
22impl ExecutorBuilder for UpstreamSinkUnionExecutorBuilder {
23 type Node = UpstreamSinkUnionNode;
24
25 async fn new_boxed_executor(
26 params: ExecutorParams,
27 node: &Self::Node,
28 _store: impl StateStore,
29 ) -> StreamResult<Executor> {
30 let init_upstreams = node
31 .get_init_upstreams()
32 .iter()
33 .map(|init_upstream| {
34 let upstream_fragment_id = init_upstream.get_upstream_fragment_id();
35 UpstreamFragmentInfo::new(
36 upstream_fragment_id,
37 ¶ms.actor_context.initial_upstream_actors,
38 init_upstream.get_sink_output_schema(),
39 init_upstream.get_project_exprs(),
40 params.eval_error_report.clone(),
41 )
42 })
43 .try_collect()?;
44
45 let executor = UpstreamSinkUnionExecutor::new(
46 params.actor_context,
47 params.local_barrier_manager,
48 params.executor_stats,
49 params.env.config().developer.chunk_size,
50 init_upstreams,
51 params.eval_error_report,
52 )
53 .await?;
54
55 Ok((params.info, executor).into())
56 }
57}