risingwave_stream/from_proto/
upstream_sink_union.rs1use risingwave_pb::stream_plan::UpstreamSinkUnionNode;
16
17use super::*;
18use crate::executor::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
19
20pub struct UpstreamSinkUnionExecutorBuilder;
21
22impl ExecutorBuilder for UpstreamSinkUnionExecutorBuilder {
23 type Node = UpstreamSinkUnionNode;
24
25 async fn new_boxed_executor(
26 params: ExecutorParams,
27 node: &Self::Node,
28 _store: impl StateStore,
29 ) -> StreamResult<Executor> {
30 let mut upstreams = Vec::with_capacity(node.get_init_upstreams().len());
31 for init_upstream in node.get_init_upstreams() {
32 let upstream_fragment_id = init_upstream.get_upstream_fragment_id();
33 let upstream_fragment_info = UpstreamFragmentInfo::new(
34 upstream_fragment_id,
35 ¶ms.actor_context.initial_upstream_actors,
36 init_upstream.get_sink_output_schema(),
37 init_upstream.get_project_exprs(),
38 params.eval_error_report.clone(),
39 )?;
40 upstreams.push(upstream_fragment_info);
41 }
42
43 Ok((
44 params.info,
45 UpstreamSinkUnionExecutor::new(
46 params.actor_context,
47 params.local_barrier_manager,
48 params.executor_stats,
49 params.env.config().developer.chunk_size,
50 upstreams,
51 params.eval_error_report,
52 ),
53 )
54 .into())
55 }
56}