risingwave_stream/from_proto/
source_backfill.rs1use risingwave_connector::WithOptionsSecResolved;
16use risingwave_pb::stream_plan::SourceBackfillNode;
17
18use super::*;
19use crate::executor::source::{
20 BackfillStateTableHandler, SourceBackfillExecutor, SourceBackfillExecutorInner,
21 SourceStateTableHandler, StreamSourceCore,
22};
23
24pub struct SourceBackfillExecutorBuilder;
25
26impl ExecutorBuilder for SourceBackfillExecutorBuilder {
27 type Node = SourceBackfillNode;
28
29 async fn new_boxed_executor(
30 params: ExecutorParams,
31 node: &Self::Node,
32 store: impl StateStore,
33 ) -> StreamResult<Executor> {
34 let source_id = node.upstream_source_id;
35 let source_name = node.source_name.clone();
36 let source_info = node.get_info()?;
37
38 let options_with_secret =
39 WithOptionsSecResolved::new(node.with_properties.clone(), node.secret_refs.clone());
40 let source_desc_builder = super::source::create_source_desc_builder(
41 node.columns.clone(),
42 ¶ms,
43 source_info.clone(),
44 node.row_id_index,
45 options_with_secret,
46 );
47
48 let source_column_ids: Vec<_> = source_desc_builder
49 .column_catalogs_to_source_column_descs()
50 .iter()
51 .map(|column| column.column_id)
52 .collect();
53
54 let state_table_handler = SourceStateTableHandler::from_table_catalog(
56 node.state_table.as_ref().unwrap(),
57 store.clone(),
58 )
59 .await;
60 let backfill_state_table = BackfillStateTableHandler::from_table_catalog(
61 node.state_table.as_ref().unwrap(),
62 store.clone(),
63 )
64 .await;
65 let stream_source_core = StreamSourceCore::new(
66 source_id,
67 source_name,
68 source_column_ids,
69 source_desc_builder,
70 state_table_handler,
71 );
72 let progress = params
73 .local_barrier_manager
74 .register_create_mview_progress(¶ms.actor_context);
75
76 let exec = SourceBackfillExecutorInner::new(
77 params.actor_context.clone(),
78 params.info.clone(),
79 stream_source_core,
80 params.executor_stats.clone(),
81 params.env.system_params_manager_ref().get_params(),
82 backfill_state_table,
83 node.rate_limit,
84 progress,
85 );
86 let [input]: [_; 1] = params.input.try_into().unwrap();
87
88 Ok((
89 params.info,
90 SourceBackfillExecutor { inner: exec, input }.boxed(),
91 )
92 .into())
93 }
94}