risingwave_stream/executor/backfill/cdc/upstream_table/
external.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{Schema, TableId};
16use risingwave_common::util::sort_util::OrderType;
17use risingwave_connector::error::ConnectorResult;
18use risingwave_connector::source::cdc::external::{
19    CdcOffset, CdcTableType, ExternalTableConfig, ExternalTableReader, ExternalTableReaderImpl,
20    SchemaTableName,
21};
22
23/// This struct represents an external table to be read during backfill
24#[derive(Debug, Clone)]
25pub struct ExternalStorageTable {
26    /// Id for this table.
27    table_id: TableId,
28
29    /// The normalized name of the table, e.g. `dbname.schema_name.table_name`.
30    table_name: String,
31
32    schema_name: String,
33
34    database_name: String,
35
36    config: ExternalTableConfig,
37
38    table_type: CdcTableType,
39
40    /// The schema of the output columns, i.e., this table VIEWED BY some executor like
41    /// `RowSeqScanExecutor`.
42    /// todo: the schema of the external table defined in the CREATE TABLE DDL
43    schema: Schema,
44
45    pk_order_types: Vec<OrderType>,
46
47    /// Indices of primary key.
48    /// Note that the index is based on the all columns of the table.
49    pk_indices: Vec<usize>,
50}
51
52impl ExternalStorageTable {
53    #[allow(clippy::too_many_arguments)]
54    pub fn new(
55        table_id: TableId,
56        SchemaTableName {
57            table_name,
58            schema_name,
59        }: SchemaTableName,
60        database_name: String,
61        config: ExternalTableConfig,
62        table_type: CdcTableType,
63        schema: Schema,
64        pk_order_types: Vec<OrderType>,
65        pk_indices: Vec<usize>,
66    ) -> Self {
67        Self {
68            table_id,
69            table_name,
70            schema_name,
71            database_name,
72            config,
73            table_type,
74            schema,
75            pk_order_types,
76            pk_indices,
77        }
78    }
79
80    pub fn table_id(&self) -> TableId {
81        self.table_id
82    }
83
84    pub fn pk_order_types(&self) -> &[OrderType] {
85        &self.pk_order_types
86    }
87
88    pub fn schema(&self) -> &Schema {
89        &self.schema
90    }
91
92    pub fn pk_indices(&self) -> &[usize] {
93        &self.pk_indices
94    }
95
96    pub fn schema_table_name(&self) -> SchemaTableName {
97        SchemaTableName {
98            schema_name: self.schema_name.clone(),
99            table_name: self.table_name.clone(),
100        }
101    }
102
103    pub async fn create_table_reader(&self) -> ConnectorResult<ExternalTableReaderImpl> {
104        self.table_type
105            .create_table_reader(
106                self.config.clone(),
107                self.schema.clone(),
108                self.pk_indices.clone(),
109            )
110            .await
111    }
112
113    pub fn qualified_table_name(&self) -> String {
114        format!("{}.{}", self.schema_name, self.table_name)
115    }
116
117    pub fn database_name(&self) -> &str {
118        self.database_name.as_str()
119    }
120
121    pub async fn current_cdc_offset(
122        &self,
123        table_reader: &ExternalTableReaderImpl,
124    ) -> ConnectorResult<Option<CdcOffset>> {
125        let binlog = table_reader.current_cdc_offset().await?;
126        Ok(Some(binlog))
127    }
128}