risingwave_stream/executor/backfill/cdc/upstream_table/
external.rs1use risingwave_common::catalog::{Schema, TableId};
16use risingwave_common::util::sort_util::OrderType;
17use risingwave_connector::error::ConnectorResult;
18use risingwave_connector::source::cdc::external::{
19 CdcOffset, CdcTableType, ExternalTableConfig, ExternalTableReader, ExternalTableReaderImpl,
20 SchemaTableName,
21};
22
23#[derive(Debug, Clone)]
25pub struct ExternalStorageTable {
26 table_id: TableId,
28
29 table_name: String,
31
32 schema_name: String,
33
34 database_name: String,
35
36 config: ExternalTableConfig,
37
38 table_type: CdcTableType,
39
40 schema: Schema,
44
45 pk_order_types: Vec<OrderType>,
46
47 pk_indices: Vec<usize>,
50}
51
52impl ExternalStorageTable {
53 #[allow(clippy::too_many_arguments)]
54 pub fn new(
55 table_id: TableId,
56 SchemaTableName {
57 table_name,
58 schema_name,
59 }: SchemaTableName,
60 database_name: String,
61 config: ExternalTableConfig,
62 table_type: CdcTableType,
63 schema: Schema,
64 pk_order_types: Vec<OrderType>,
65 pk_indices: Vec<usize>,
66 ) -> Self {
67 Self {
68 table_id,
69 table_name,
70 schema_name,
71 database_name,
72 config,
73 table_type,
74 schema,
75 pk_order_types,
76 pk_indices,
77 }
78 }
79
80 #[cfg(test)]
81 pub fn for_test_undefined() -> Self {
82 Self {
83 table_id: 1.into(),
84 table_name: "for_test_table_name".into(),
85 schema_name: "for_test_schema_name".into(),
86 database_name: "for_test_database_name".into(),
87 config: ExternalTableConfig::default(),
88 table_type: CdcTableType::Undefined,
89 schema: Schema::empty().to_owned(),
90 pk_order_types: vec![],
91 pk_indices: vec![],
92 }
93 }
94
95 pub fn table_id(&self) -> TableId {
96 self.table_id
97 }
98
99 pub fn pk_order_types(&self) -> &[OrderType] {
100 &self.pk_order_types
101 }
102
103 pub fn schema(&self) -> &Schema {
104 &self.schema
105 }
106
107 pub fn pk_indices(&self) -> &[usize] {
108 &self.pk_indices
109 }
110
111 pub fn schema_table_name(&self) -> SchemaTableName {
112 SchemaTableName {
113 schema_name: self.schema_name.clone(),
114 table_name: self.table_name.clone(),
115 }
116 }
117
118 pub async fn create_table_reader(&self) -> ConnectorResult<ExternalTableReaderImpl> {
119 self.table_type
120 .create_table_reader(
121 self.config.clone(),
122 self.schema.clone(),
123 self.pk_indices.clone(),
124 )
125 .await
126 }
127
128 pub fn qualified_table_name(&self) -> String {
129 format!("{}.{}", self.schema_name, self.table_name)
130 }
131
132 pub fn database_name(&self) -> &str {
133 self.database_name.as_str()
134 }
135
136 pub async fn current_cdc_offset(
137 &self,
138 table_reader: &ExternalTableReaderImpl,
139 ) -> ConnectorResult<Option<CdcOffset>> {
140 let binlog = table_reader.current_cdc_offset().await?;
141 Ok(Some(binlog))
142 }
143}