risingwave_stream/from_proto/
upstream_sink_union.rs1use risingwave_pb::stream_plan::UpstreamSinkUnionNode;
16
17use super::*;
18use crate::executor::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
19
20pub struct UpstreamSinkUnionExecutorBuilder;
21
22impl_stream_node_body!(UpstreamSinkUnion(UpstreamSinkUnionNode) => UpstreamSinkUnionExecutorBuilder);
23
24impl ExecutorBuilder for UpstreamSinkUnionExecutorBuilder {
25 type Node = UpstreamSinkUnionNode;
26
27 async fn new_boxed_executor(
28 params: ExecutorParams,
29 node: &Self::Node,
30 _store: impl StateStore,
31 ) -> StreamResult<Executor> {
32 let init_upstreams = node
33 .get_init_upstreams()
34 .iter()
35 .map(|init_upstream| {
36 let upstream_fragment_id = init_upstream.get_upstream_fragment_id();
37 UpstreamFragmentInfo::new(
38 upstream_fragment_id,
39 ¶ms.actor_context.initial_upstream_actors,
40 init_upstream.get_sink_output_schema(),
41 init_upstream.get_project_exprs(),
42 params.eval_error_report.clone(),
43 )
44 })
45 .try_collect()?;
46
47 let executor = UpstreamSinkUnionExecutor::new(
48 params.actor_context,
49 params.local_barrier_manager,
50 params.executor_stats,
51 params.config.developer.chunk_size,
52 init_upstreams,
53 params.eval_error_report,
54 )
55 .await?;
56
57 Ok((params.info, executor).into())
58 }
59}