Skip to main content

risingwave_stream/from_proto/
source_backfill.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_connector::WithOptionsSecResolved;
16use risingwave_pb::stream_plan::SourceBackfillNode;
17
18use super::*;
19use crate::executor::source::{
20    BackfillStateTableHandler, SourceBackfillExecutor, SourceBackfillExecutorInner,
21    SourceStateTableHandler, StreamSourceCore,
22};
23
24pub struct SourceBackfillExecutorBuilder;
25
26impl_stream_node_body!(SourceBackfill(SourceBackfillNode) => SourceBackfillExecutorBuilder);
27
28impl ExecutorBuilder for SourceBackfillExecutorBuilder {
29    type Node = SourceBackfillNode;
30
31    async fn new_boxed_executor(
32        params: ExecutorParams,
33        node: &Self::Node,
34        store: impl StateStore,
35    ) -> StreamResult<Executor> {
36        let source_id = node.upstream_source_id;
37        let source_name = node.source_name.clone();
38        let source_info = node.get_info()?;
39
40        let options_with_secret =
41            WithOptionsSecResolved::new(node.with_properties.clone(), node.secret_refs.clone());
42        let source_desc_builder = super::source::create_source_desc_builder(
43            node.columns.clone(),
44            &params,
45            source_info.clone(),
46            node.row_id_index,
47            options_with_secret,
48        );
49
50        let source_column_ids: Vec<_> = source_desc_builder
51            .column_catalogs_to_source_column_descs()
52            .iter()
53            .map(|column| column.column_id)
54            .collect();
55
56        // FIXME: remove this. It's wrong
57        let state_table_handler = SourceStateTableHandler::from_table_catalog(
58            node.state_table.as_ref().unwrap(),
59            store.clone(),
60        )
61        .await;
62        let backfill_state_table = BackfillStateTableHandler::from_table_catalog(
63            node.state_table.as_ref().unwrap(),
64            store.clone(),
65        )
66        .await;
67        let stream_source_core = StreamSourceCore::new(
68            source_id,
69            source_name,
70            source_column_ids,
71            source_desc_builder,
72            state_table_handler,
73        );
74        let progress = params
75            .local_barrier_manager
76            .register_create_mview_progress(&params.actor_context);
77
78        let exec = SourceBackfillExecutorInner::new(
79            params.actor_context.clone(),
80            params.info.clone(),
81            stream_source_core,
82            params.executor_stats.clone(),
83            params.env.system_params_manager_ref().get_params(),
84            backfill_state_table,
85            node.rate_limit,
86            progress,
87        );
88        let [input]: [_; 1] = params.input.try_into().unwrap();
89
90        Ok((
91            params.info,
92            SourceBackfillExecutor { inner: exec, input }.boxed(),
93        )
94            .into())
95    }
96}