1use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, max_column_id};
20use risingwave_common::types::{DataType, StructType};
21use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
22use risingwave_pb::plan_common::{
23 AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
24 AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
25 AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp,
26 AdditionalDatabaseName, AdditionalSchemaName, AdditionalSubject, AdditionalTableName,
27};
28
29use crate::error::ConnectorResult;
30use crate::source::cdc::MONGODB_CDC_CONNECTOR;
31use crate::source::{
32 AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR,
33 NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
34};
35
36pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
38 LazyLock::new(|| HashSet::from(["partition", "offset"]));
39
40pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet<&'static str>>> =
41 LazyLock::new(|| {
42 HashMap::from([
43 (
44 KAFKA_CONNECTOR,
45 HashSet::from([
46 "key",
47 "timestamp",
48 "partition",
49 "offset",
50 "header",
51 "payload",
52 ]),
53 ),
54 (
55 PULSAR_CONNECTOR,
56 HashSet::from(["key", "partition", "offset", "payload"]),
57 ),
58 (
59 KINESIS_CONNECTOR,
60 HashSet::from(["key", "partition", "offset", "timestamp", "payload"]),
61 ),
62 (
63 NATS_CONNECTOR,
64 HashSet::from(["partition", "offset", "payload", "subject"]),
65 ),
66 (
67 OPENDAL_S3_CONNECTOR,
68 HashSet::from(["file", "offset", "payload"]),
69 ),
70 (GCS_CONNECTOR, HashSet::from(["file", "offset", "payload"])),
71 (
72 AZBLOB_CONNECTOR,
73 HashSet::from(["file", "offset", "payload"]),
74 ),
75 (
76 POSIX_FS_CONNECTOR,
77 HashSet::from(["file", "offset", "payload"]),
78 ),
79 (
81 MONGODB_CDC_CONNECTOR,
82 HashSet::from([
83 "timestamp",
84 "partition",
85 "offset",
86 "database_name",
87 "collection_name",
88 ]),
89 ),
90 (MQTT_CONNECTOR, HashSet::from(["offset", "partition"])),
91 ])
92 });
93
94pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
96 LazyLock::new(|| {
97 Some(HashSet::from([
98 "timestamp",
99 "database_name",
100 "schema_name",
101 "table_name",
102 ]))
103 });
104
105pub fn get_supported_additional_columns(
106 connector_name: &str,
107 is_cdc_backfill: bool,
108) -> Option<&HashSet<&'static str>> {
109 if is_cdc_backfill {
110 CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref()
111 } else {
112 COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name)
113 }
114}
115
116pub fn gen_default_addition_col_name(
117 connector_name: &str,
118 additional_col_type: &str,
119 inner_field_name: Option<&str>,
120 data_type: Option<&DataType>,
121) -> String {
122 let legacy_dt_name = data_type.map(|dt| format!("{:?}", dt).to_lowercase());
123 let col_name = [
124 Some(connector_name),
125 Some(additional_col_type),
126 inner_field_name,
127 legacy_dt_name.as_deref(),
128 ];
129 col_name.iter().fold("_rw".to_owned(), |name, ele| {
130 if let Some(ele) = ele {
131 format!("{}_{}", name, ele)
132 } else {
133 name
134 }
135 })
136}
137
138pub fn build_additional_column_desc(
139 column_id: ColumnId,
140 connector_name: &str,
141 additional_col_type: &str,
142 column_alias: Option<String>,
143 inner_field_name: Option<&str>,
144 data_type: Option<&DataType>,
145 reject_unknown_connector: bool,
146 is_cdc_backfill_table: bool,
147) -> ConnectorResult<ColumnDesc> {
148 let compatible_columns = match (
149 get_supported_additional_columns(connector_name, is_cdc_backfill_table),
150 reject_unknown_connector,
151 ) {
152 (Some(compat_cols), _) => compat_cols,
153 (None, false) => &COMMON_COMPATIBLE_ADDITIONAL_COLUMNS,
154 (None, true) => {
155 bail!(
156 "additional column is not supported for connector {}, acceptable connectors: {:?}",
157 connector_name,
158 COMPATIBLE_ADDITIONAL_COLUMNS.keys(),
159 );
160 }
161 };
162 if !compatible_columns.contains(additional_col_type) {
163 bail!(
164 "additional column type {} is not supported for connector {}, acceptable column types: {:?}",
165 additional_col_type,
166 connector_name,
167 compatible_columns
168 );
169 }
170
171 let column_name = column_alias.unwrap_or_else(|| {
172 gen_default_addition_col_name(
173 connector_name,
174 additional_col_type,
175 inner_field_name,
176 data_type,
177 )
178 });
179
180 let col_desc = match additional_col_type {
181 "key" => ColumnDesc::named_with_additional_column(
182 column_name,
183 column_id,
184 DataType::Bytea,
185 AdditionalColumn {
186 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
187 },
188 ),
189
190 "timestamp" => ColumnDesc::named_with_additional_column(
191 column_name,
192 column_id,
193 DataType::Timestamptz,
194 AdditionalColumn {
195 column_type: Some(AdditionalColumnType::Timestamp(
196 AdditionalColumnTimestamp {},
197 )),
198 },
199 ),
200 "partition" => ColumnDesc::named_with_additional_column(
201 column_name,
202 column_id,
203 DataType::Varchar,
204 AdditionalColumn {
205 column_type: Some(AdditionalColumnType::Partition(
206 AdditionalColumnPartition {},
207 )),
208 },
209 ),
210 "payload" => ColumnDesc::named_with_additional_column(
211 column_name,
212 column_id,
213 DataType::Jsonb,
214 AdditionalColumn {
215 column_type: Some(AdditionalColumnType::Payload(AdditionalColumnPayload {})),
216 },
217 ),
218 "offset" => ColumnDesc::named_with_additional_column(
219 column_name,
220 column_id,
221 DataType::Varchar,
222 AdditionalColumn {
223 column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})),
224 },
225 ),
226
227 "file" => ColumnDesc::named_with_additional_column(
228 column_name,
229 column_id,
230 DataType::Varchar,
231 AdditionalColumn {
232 column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})),
233 },
234 ),
235 "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type),
236 "database_name" => ColumnDesc::named_with_additional_column(
237 column_name,
238 column_id,
239 DataType::Varchar,
240 AdditionalColumn {
241 column_type: Some(AdditionalColumnType::DatabaseName(
242 AdditionalDatabaseName {},
243 )),
244 },
245 ),
246 "schema_name" => ColumnDesc::named_with_additional_column(
247 column_name,
248 column_id,
249 DataType::Varchar,
250 AdditionalColumn {
251 column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})),
252 },
253 ),
254 "table_name" => ColumnDesc::named_with_additional_column(
255 column_name,
256 column_id,
257 DataType::Varchar,
258 AdditionalColumn {
259 column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
260 },
261 ),
262 "collection_name" => ColumnDesc::named_with_additional_column(
263 column_name,
264 column_id,
265 DataType::Varchar,
266 AdditionalColumn {
267 column_type: Some(AdditionalColumnType::CollectionName(
268 AdditionalCollectionName {},
269 )),
270 },
271 ),
272 "subject" => ColumnDesc::named_with_additional_column(
273 column_name,
274 column_id,
275 DataType::Varchar, AdditionalColumn {
277 column_type: Some(AdditionalColumnType::Subject(AdditionalSubject {})),
278 },
279 ),
280 _ => unreachable!(),
281 };
282
283 Ok(col_desc)
284}
285
286pub fn source_add_partition_offset_cols(
292 columns: &[ColumnCatalog],
293 connector_name: &str,
294 skip_col_id: bool,
295) -> ([bool; 2], [ColumnDesc; 2]) {
296 let mut columns_exist = [false; 2];
297
298 let mut last_column_id = max_column_id(columns);
299 let mut assign_col_id = || {
300 if skip_col_id {
301 ColumnId::placeholder()
303 } else {
304 last_column_id = last_column_id.next();
305 last_column_id
306 }
307 };
308
309 let additional_columns: Vec<_> = {
310 let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
311 .get(connector_name)
312 .unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
313 ["partition", "file", "offset"]
314 .iter()
315 .filter_map(|col_type| {
316 if compat_col_types.contains(col_type) {
317 Some(
318 build_additional_column_desc(
319 assign_col_id(),
320 connector_name,
321 col_type,
322 None,
323 None,
324 None,
325 false,
326 false,
327 )
328 .unwrap(),
329 )
330 } else {
331 None
332 }
333 })
334 .collect()
335 };
336 assert_eq!(additional_columns.len(), 2);
337 use risingwave_pb::plan_common::additional_column::ColumnType;
338 assert_matches::assert_matches!(
339 additional_columns[0].additional_column,
340 AdditionalColumn {
341 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
342 }
343 );
344 assert_matches::assert_matches!(
345 additional_columns[1].additional_column,
346 AdditionalColumn {
347 column_type: Some(ColumnType::Offset(_)),
348 }
349 );
350
351 for col in columns {
353 match col.column_desc.additional_column {
354 AdditionalColumn {
355 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
356 } => {
357 columns_exist[0] = true;
358 }
359 AdditionalColumn {
360 column_type: Some(ColumnType::Offset(_)),
361 } => {
362 columns_exist[1] = true;
363 }
364 _ => (),
365 }
366 }
367
368 (columns_exist, additional_columns.try_into().unwrap())
369}
370
371fn build_header_catalog(
372 column_id: ColumnId,
373 col_name: &str,
374 inner_field_name: Option<&str>,
375 data_type: Option<&DataType>,
376) -> ColumnDesc {
377 if let Some(inner) = inner_field_name {
378 let data_type = data_type.unwrap_or(&DataType::Bytea);
379 let pb_data_type = data_type.to_protobuf();
380 ColumnDesc::named_with_additional_column(
381 col_name,
382 column_id,
383 data_type.clone(),
384 AdditionalColumn {
385 column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader {
386 inner_field: inner.to_owned(),
387 data_type: Some(pb_data_type),
388 })),
389 },
390 )
391 } else {
392 ColumnDesc::named_with_additional_column(
393 col_name,
394 column_id,
395 DataType::List(get_kafka_header_item_datatype().into()),
396 AdditionalColumn {
397 column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})),
398 },
399 )
400 }
401}
402
403pub fn get_kafka_header_item_datatype() -> DataType {
404 let struct_inner = vec![("key", DataType::Varchar), ("value", DataType::Bytea)];
405 DataType::Struct(StructType::new(struct_inner))
406}
407
408#[cfg(test)]
409mod test {
410 use super::*;
411
412 #[test]
413 fn test_gen_default_addition_col_name() {
414 assert_eq!(
415 gen_default_addition_col_name("kafka", "key", None, None),
416 "_rw_kafka_key"
417 );
418 assert_eq!(
419 gen_default_addition_col_name("kafka", "header", Some("inner"), None),
420 "_rw_kafka_header_inner"
421 );
422 assert_eq!(
423 gen_default_addition_col_name(
424 "kafka",
425 "header",
426 Some("inner"),
427 Some(&DataType::Varchar)
428 ),
429 "_rw_kafka_header_inner_varchar"
430 );
431 }
432}