risingwave_stream/executor/backfill/cdc/upstream_table/
external.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{Schema, TableId};
16use risingwave_common::util::sort_util::OrderType;
17use risingwave_connector::error::ConnectorResult;
18use risingwave_connector::source::cdc::external::{
19    CdcOffset, CdcTableType, ExternalTableConfig, ExternalTableReader, ExternalTableReaderImpl,
20    SchemaTableName,
21};
22
23/// This struct represents an external table to be read during backfill
24#[derive(Debug, Clone)]
25pub struct ExternalStorageTable {
26    /// Id for this table.
27    table_id: TableId,
28
29    /// The normalized name of the table, e.g. `dbname.schema_name.table_name`.
30    table_name: String,
31
32    schema_name: String,
33
34    database_name: String,
35
36    config: ExternalTableConfig,
37
38    table_type: CdcTableType,
39
40    /// The schema of the output columns, i.e., this table VIEWED BY some executor like
41    /// `RowSeqScanExecutor`.
42    /// todo: the schema of the external table defined in the CREATE TABLE DDL
43    schema: Schema,
44
45    pk_order_types: Vec<OrderType>,
46
47    /// Indices of primary key.
48    /// Note that the index is based on the all columns of the table.
49    pk_indices: Vec<usize>,
50}
51
52impl ExternalStorageTable {
53    #[allow(clippy::too_many_arguments)]
54    pub fn new(
55        table_id: TableId,
56        SchemaTableName {
57            table_name,
58            schema_name,
59        }: SchemaTableName,
60        database_name: String,
61        config: ExternalTableConfig,
62        table_type: CdcTableType,
63        schema: Schema,
64        pk_order_types: Vec<OrderType>,
65        pk_indices: Vec<usize>,
66    ) -> Self {
67        Self {
68            table_id,
69            table_name,
70            schema_name,
71            database_name,
72            config,
73            table_type,
74            schema,
75            pk_order_types,
76            pk_indices,
77        }
78    }
79
80    #[cfg(test)]
81    pub fn for_test_undefined() -> Self {
82        Self {
83            table_id: 1.into(),
84            table_name: "for_test_table_name".into(),
85            schema_name: "for_test_schema_name".into(),
86            database_name: "for_test_database_name".into(),
87            config: ExternalTableConfig::default(),
88            table_type: CdcTableType::Undefined,
89            schema: Schema::empty().to_owned(),
90            pk_order_types: vec![],
91            pk_indices: vec![],
92        }
93    }
94
95    pub fn table_id(&self) -> TableId {
96        self.table_id
97    }
98
99    pub fn pk_order_types(&self) -> &[OrderType] {
100        &self.pk_order_types
101    }
102
103    pub fn schema(&self) -> &Schema {
104        &self.schema
105    }
106
107    pub fn pk_indices(&self) -> &[usize] {
108        &self.pk_indices
109    }
110
111    pub fn schema_table_name(&self) -> SchemaTableName {
112        SchemaTableName {
113            schema_name: self.schema_name.clone(),
114            table_name: self.table_name.clone(),
115        }
116    }
117
118    pub async fn create_table_reader(&self) -> ConnectorResult<ExternalTableReaderImpl> {
119        self.table_type
120            .create_table_reader(
121                self.config.clone(),
122                self.schema.clone(),
123                self.pk_indices.clone(),
124            )
125            .await
126    }
127
128    pub fn qualified_table_name(&self) -> String {
129        format!("{}.{}", self.schema_name, self.table_name)
130    }
131
132    pub fn database_name(&self) -> &str {
133        self.database_name.as_str()
134    }
135
136    pub async fn current_cdc_offset(
137        &self,
138        table_reader: &ExternalTableReaderImpl,
139    ) -> ConnectorResult<Option<CdcOffset>> {
140        let binlog = table_reader.current_cdc_offset().await?;
141        Ok(Some(binlog))
142    }
143}