risingwave_stream/from_proto/
source_backfill.rs1use risingwave_connector::WithOptionsSecResolved;
16use risingwave_pb::stream_plan::SourceBackfillNode;
17
18use super::*;
19use crate::executor::source::{
20 BackfillStateTableHandler, SourceBackfillExecutor, SourceBackfillExecutorInner,
21 SourceStateTableHandler, StreamSourceCore,
22};
23
24pub struct SourceBackfillExecutorBuilder;
25
26impl_stream_node_body!(SourceBackfill(SourceBackfillNode) => SourceBackfillExecutorBuilder);
27
28impl ExecutorBuilder for SourceBackfillExecutorBuilder {
29 type Node = SourceBackfillNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 let source_id = node.upstream_source_id;
37 let source_name = node.source_name.clone();
38 let source_info = node.get_info()?;
39
40 let options_with_secret =
41 WithOptionsSecResolved::new(node.with_properties.clone(), node.secret_refs.clone());
42 let source_desc_builder = super::source::create_source_desc_builder(
43 node.columns.clone(),
44 ¶ms,
45 source_info.clone(),
46 node.row_id_index,
47 options_with_secret,
48 );
49
50 let source_column_ids: Vec<_> = source_desc_builder
51 .column_catalogs_to_source_column_descs()
52 .iter()
53 .map(|column| column.column_id)
54 .collect();
55
56 let state_table_handler = SourceStateTableHandler::from_table_catalog(
58 node.state_table.as_ref().unwrap(),
59 store.clone(),
60 )
61 .await;
62 let backfill_state_table = BackfillStateTableHandler::from_table_catalog(
63 node.state_table.as_ref().unwrap(),
64 store.clone(),
65 )
66 .await;
67 let stream_source_core = StreamSourceCore::new(
68 source_id,
69 source_name,
70 source_column_ids,
71 source_desc_builder,
72 state_table_handler,
73 );
74 let progress = params
75 .local_barrier_manager
76 .register_create_mview_progress(¶ms.actor_context);
77
78 let exec = SourceBackfillExecutorInner::new(
79 params.actor_context.clone(),
80 params.info.clone(),
81 stream_source_core,
82 params.executor_stats.clone(),
83 params.env.system_params_manager_ref().get_params(),
84 backfill_state_table,
85 node.rate_limit,
86 progress,
87 );
88 let [input]: [_; 1] = params.input.try_into().unwrap();
89
90 Ok((
91 params.info,
92 SourceBackfillExecutor { inner: exec, input }.boxed(),
93 )
94 .into())
95 }
96}