risingwave_stream/executor/backfill/cdc/upstream_table/
external.rs1use risingwave_common::catalog::{Schema, TableId};
16use risingwave_common::util::sort_util::OrderType;
17use risingwave_connector::error::ConnectorResult;
18use risingwave_connector::source::cdc::external::{
19 CdcOffset, CdcTableType, ExternalTableConfig, ExternalTableReader, ExternalTableReaderImpl,
20 SchemaTableName,
21};
22
23#[derive(Debug, Clone)]
25pub struct ExternalStorageTable {
26 table_id: TableId,
28
29 table_name: String,
31
32 schema_name: String,
33
34 database_name: String,
35
36 config: ExternalTableConfig,
37
38 table_type: CdcTableType,
39
40 schema: Schema,
44
45 pk_order_types: Vec<OrderType>,
46
47 pk_indices: Vec<usize>,
50}
51
52impl ExternalStorageTable {
53 #[allow(clippy::too_many_arguments)]
54 pub fn new(
55 table_id: TableId,
56 SchemaTableName {
57 table_name,
58 schema_name,
59 }: SchemaTableName,
60 database_name: String,
61 config: ExternalTableConfig,
62 table_type: CdcTableType,
63 schema: Schema,
64 pk_order_types: Vec<OrderType>,
65 pk_indices: Vec<usize>,
66 ) -> Self {
67 Self {
68 table_id,
69 table_name,
70 schema_name,
71 database_name,
72 config,
73 table_type,
74 schema,
75 pk_order_types,
76 pk_indices,
77 }
78 }
79
80 pub fn table_id(&self) -> TableId {
81 self.table_id
82 }
83
84 pub fn pk_order_types(&self) -> &[OrderType] {
85 &self.pk_order_types
86 }
87
88 pub fn schema(&self) -> &Schema {
89 &self.schema
90 }
91
92 pub fn pk_indices(&self) -> &[usize] {
93 &self.pk_indices
94 }
95
96 pub fn schema_table_name(&self) -> SchemaTableName {
97 SchemaTableName {
98 schema_name: self.schema_name.clone(),
99 table_name: self.table_name.clone(),
100 }
101 }
102
103 pub async fn create_table_reader(&self) -> ConnectorResult<ExternalTableReaderImpl> {
104 self.table_type
105 .create_table_reader(
106 self.config.clone(),
107 self.schema.clone(),
108 self.pk_indices.clone(),
109 )
110 .await
111 }
112
113 pub fn qualified_table_name(&self) -> String {
114 format!("{}.{}", self.schema_name, self.table_name)
115 }
116
117 pub fn database_name(&self) -> &str {
118 self.database_name.as_str()
119 }
120
121 pub async fn current_cdc_offset(
122 &self,
123 table_reader: &ExternalTableReaderImpl,
124 ) -> ConnectorResult<Option<CdcOffset>> {
125 let binlog = table_reader.current_cdc_offset().await?;
126 Ok(Some(binlog))
127 }
128}