risingwave_stream/from_proto/
source_backfill.rs1use risingwave_common::catalog::TableId;
16use risingwave_connector::WithOptionsSecResolved;
17use risingwave_pb::stream_plan::SourceBackfillNode;
18
19use super::*;
20use crate::executor::source::{
21 BackfillStateTableHandler, SourceBackfillExecutor, SourceBackfillExecutorInner,
22 SourceStateTableHandler, StreamSourceCore,
23};
24
25pub struct SourceBackfillExecutorBuilder;
26
27impl ExecutorBuilder for SourceBackfillExecutorBuilder {
28 type Node = SourceBackfillNode;
29
30 async fn new_boxed_executor(
31 params: ExecutorParams,
32 node: &Self::Node,
33 store: impl StateStore,
34 ) -> StreamResult<Executor> {
35 let source_id = TableId::new(node.upstream_source_id);
36 let source_name = node.source_name.clone();
37 let source_info = node.get_info()?;
38
39 let options_with_secret =
40 WithOptionsSecResolved::new(node.with_properties.clone(), node.secret_refs.clone());
41 let source_desc_builder = super::source::create_source_desc_builder(
42 node.columns.clone(),
43 ¶ms,
44 source_info.clone(),
45 node.row_id_index,
46 options_with_secret,
47 );
48
49 let source_column_ids: Vec<_> = source_desc_builder
50 .column_catalogs_to_source_column_descs()
51 .iter()
52 .map(|column| column.column_id)
53 .collect();
54
55 let state_table_handler = SourceStateTableHandler::from_table_catalog(
57 node.state_table.as_ref().unwrap(),
58 store.clone(),
59 )
60 .await;
61 let backfill_state_table = BackfillStateTableHandler::from_table_catalog(
62 node.state_table.as_ref().unwrap(),
63 store.clone(),
64 )
65 .await;
66 let stream_source_core = StreamSourceCore::new(
67 source_id,
68 source_name,
69 source_column_ids,
70 source_desc_builder,
71 state_table_handler,
72 );
73 let progress = params
74 .local_barrier_manager
75 .register_create_mview_progress(params.actor_context.id);
76
77 let exec = SourceBackfillExecutorInner::new(
78 params.actor_context.clone(),
79 params.info.clone(),
80 stream_source_core,
81 params.executor_stats.clone(),
82 params.env.system_params_manager_ref().get_params(),
83 backfill_state_table,
84 node.rate_limit,
85 progress,
86 );
87 let [input]: [_; 1] = params.input.try_into().unwrap();
88
89 Ok((
90 params.info,
91 SourceBackfillExecutor { inner: exec, input }.boxed(),
92 )
93 .into())
94 }
95}