risingwave_stream/from_proto/
source_backfill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_connector::WithOptionsSecResolved;
16use risingwave_pb::stream_plan::SourceBackfillNode;
17
18use super::*;
19use crate::executor::source::{
20    BackfillStateTableHandler, SourceBackfillExecutor, SourceBackfillExecutorInner,
21    SourceStateTableHandler, StreamSourceCore,
22};
23
24pub struct SourceBackfillExecutorBuilder;
25
26impl ExecutorBuilder for SourceBackfillExecutorBuilder {
27    type Node = SourceBackfillNode;
28
29    async fn new_boxed_executor(
30        params: ExecutorParams,
31        node: &Self::Node,
32        store: impl StateStore,
33    ) -> StreamResult<Executor> {
34        let source_id = node.upstream_source_id;
35        let source_name = node.source_name.clone();
36        let source_info = node.get_info()?;
37
38        let options_with_secret =
39            WithOptionsSecResolved::new(node.with_properties.clone(), node.secret_refs.clone());
40        let source_desc_builder = super::source::create_source_desc_builder(
41            node.columns.clone(),
42            &params,
43            source_info.clone(),
44            node.row_id_index,
45            options_with_secret,
46        );
47
48        let source_column_ids: Vec<_> = source_desc_builder
49            .column_catalogs_to_source_column_descs()
50            .iter()
51            .map(|column| column.column_id)
52            .collect();
53
54        // FIXME: remove this. It's wrong
55        let state_table_handler = SourceStateTableHandler::from_table_catalog(
56            node.state_table.as_ref().unwrap(),
57            store.clone(),
58        )
59        .await;
60        let backfill_state_table = BackfillStateTableHandler::from_table_catalog(
61            node.state_table.as_ref().unwrap(),
62            store.clone(),
63        )
64        .await;
65        let stream_source_core = StreamSourceCore::new(
66            source_id,
67            source_name,
68            source_column_ids,
69            source_desc_builder,
70            state_table_handler,
71        );
72        let progress = params
73            .local_barrier_manager
74            .register_create_mview_progress(&params.actor_context);
75
76        let exec = SourceBackfillExecutorInner::new(
77            params.actor_context.clone(),
78            params.info.clone(),
79            stream_source_core,
80            params.executor_stats.clone(),
81            params.env.system_params_manager_ref().get_params(),
82            backfill_state_table,
83            node.rate_limit,
84            progress,
85        );
86        let [input]: [_; 1] = params.input.try_into().unwrap();
87
88        Ok((
89            params.info,
90            SourceBackfillExecutor { inner: exec, input }.boxed(),
91        )
92            .into())
93    }
94}