1use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, max_column_id};
20use risingwave_common::types::{DataType, StructType};
21use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
22use risingwave_pb::plan_common::{
23 AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
24 AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
25 AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnPulsarMessageIdData,
26 AdditionalColumnTimestamp, AdditionalDatabaseName, AdditionalSchemaName, AdditionalSubject,
27 AdditionalTableName,
28};
29
30use crate::error::ConnectorResult;
31use crate::source::cdc::MONGODB_CDC_CONNECTOR;
32use crate::source::{
33 AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR,
34 NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
35};
36
37pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
39 LazyLock::new(|| HashSet::from(["partition", "offset"]));
40
41pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet<&'static str>>> =
42 LazyLock::new(|| {
43 HashMap::from([
44 (
45 KAFKA_CONNECTOR,
46 HashSet::from([
47 "key",
48 "timestamp",
49 "partition",
50 "offset",
51 "header",
52 "payload",
53 ]),
54 ),
55 (
56 PULSAR_CONNECTOR,
57 HashSet::from([
58 "key",
59 "partition",
60 "offset",
61 "payload",
62 "message_id_data",
63 "header",
64 ]),
65 ),
66 (
67 KINESIS_CONNECTOR,
68 HashSet::from(["key", "partition", "offset", "timestamp", "payload"]),
69 ),
70 (
71 NATS_CONNECTOR,
72 HashSet::from(["partition", "offset", "payload", "subject"]),
73 ),
74 (
75 OPENDAL_S3_CONNECTOR,
76 HashSet::from(["file", "offset", "payload"]),
77 ),
78 (GCS_CONNECTOR, HashSet::from(["file", "offset", "payload"])),
79 (
80 AZBLOB_CONNECTOR,
81 HashSet::from(["file", "offset", "payload"]),
82 ),
83 (
84 POSIX_FS_CONNECTOR,
85 HashSet::from(["file", "offset", "payload"]),
86 ),
87 (
89 MONGODB_CDC_CONNECTOR,
90 HashSet::from([
91 "timestamp",
92 "partition",
93 "offset",
94 "database_name",
95 "collection_name",
96 ]),
97 ),
98 (MQTT_CONNECTOR, HashSet::from(["offset", "partition"])),
99 ])
100 });
101
102pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
104 LazyLock::new(|| {
105 Some(HashSet::from([
106 "timestamp",
107 "database_name",
108 "schema_name",
109 "table_name",
110 ]))
111 });
112
113pub fn get_supported_additional_columns(
114 connector_name: &str,
115 is_cdc_backfill: bool,
116) -> Option<&HashSet<&'static str>> {
117 if is_cdc_backfill {
118 CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref()
119 } else {
120 COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name)
121 }
122}
123
124pub fn gen_default_addition_col_name(
125 connector_name: &str,
126 additional_col_type: &str,
127 inner_field_name: Option<&str>,
128 data_type: Option<&DataType>,
129) -> String {
130 let legacy_dt_name = data_type.map(|dt| format!("{:?}", dt).to_lowercase());
131 let col_name = [
132 Some(connector_name),
133 Some(additional_col_type),
134 inner_field_name,
135 legacy_dt_name.as_deref(),
136 ];
137 col_name.iter().fold("_rw".to_owned(), |name, ele| {
138 if let Some(ele) = ele {
139 format!("{}_{}", name, ele)
140 } else {
141 name
142 }
143 })
144}
145
146pub fn build_additional_column_desc(
147 column_id: ColumnId,
148 connector_name: &str,
149 additional_col_type: &str,
150 column_alias: Option<String>,
151 inner_field_name: Option<&str>,
152 data_type: Option<&DataType>,
153 reject_unknown_connector: bool,
154 is_cdc_backfill_table: bool,
155) -> ConnectorResult<ColumnDesc> {
156 let compatible_columns = match (
157 get_supported_additional_columns(connector_name, is_cdc_backfill_table),
158 reject_unknown_connector,
159 ) {
160 (Some(compat_cols), _) => compat_cols,
161 (None, false) => &COMMON_COMPATIBLE_ADDITIONAL_COLUMNS,
162 (None, true) => {
163 bail!(
164 "additional column is not supported for connector {}, acceptable connectors: {:?}",
165 connector_name,
166 COMPATIBLE_ADDITIONAL_COLUMNS.keys(),
167 );
168 }
169 };
170 if !compatible_columns.contains(additional_col_type) {
171 bail!(
172 "additional column type {} is not supported for connector {}, acceptable column types: {:?}",
173 additional_col_type,
174 connector_name,
175 compatible_columns
176 );
177 }
178
179 let column_name = column_alias.unwrap_or_else(|| {
180 gen_default_addition_col_name(
181 connector_name,
182 additional_col_type,
183 inner_field_name,
184 data_type,
185 )
186 });
187
188 let col_desc = match additional_col_type {
189 "key" => ColumnDesc::named_with_additional_column(
190 column_name,
191 column_id,
192 DataType::Bytea,
193 AdditionalColumn {
194 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
195 },
196 ),
197
198 "timestamp" => ColumnDesc::named_with_additional_column(
199 column_name,
200 column_id,
201 DataType::Timestamptz,
202 AdditionalColumn {
203 column_type: Some(AdditionalColumnType::Timestamp(
204 AdditionalColumnTimestamp {},
205 )),
206 },
207 ),
208 "partition" => ColumnDesc::named_with_additional_column(
209 column_name,
210 column_id,
211 DataType::Varchar,
212 AdditionalColumn {
213 column_type: Some(AdditionalColumnType::Partition(
214 AdditionalColumnPartition {},
215 )),
216 },
217 ),
218 "payload" => ColumnDesc::named_with_additional_column(
219 column_name,
220 column_id,
221 DataType::Jsonb,
222 AdditionalColumn {
223 column_type: Some(AdditionalColumnType::Payload(AdditionalColumnPayload {})),
224 },
225 ),
226 "offset" => ColumnDesc::named_with_additional_column(
227 column_name,
228 column_id,
229 DataType::Varchar,
230 AdditionalColumn {
231 column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})),
232 },
233 ),
234
235 "file" => ColumnDesc::named_with_additional_column(
236 column_name,
237 column_id,
238 DataType::Varchar,
239 AdditionalColumn {
240 column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})),
241 },
242 ),
243 "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type),
244 "database_name" => ColumnDesc::named_with_additional_column(
245 column_name,
246 column_id,
247 DataType::Varchar,
248 AdditionalColumn {
249 column_type: Some(AdditionalColumnType::DatabaseName(
250 AdditionalDatabaseName {},
251 )),
252 },
253 ),
254 "schema_name" => ColumnDesc::named_with_additional_column(
255 column_name,
256 column_id,
257 DataType::Varchar,
258 AdditionalColumn {
259 column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})),
260 },
261 ),
262 "table_name" => ColumnDesc::named_with_additional_column(
263 column_name,
264 column_id,
265 DataType::Varchar,
266 AdditionalColumn {
267 column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
268 },
269 ),
270 "collection_name" => ColumnDesc::named_with_additional_column(
271 column_name,
272 column_id,
273 DataType::Varchar,
274 AdditionalColumn {
275 column_type: Some(AdditionalColumnType::CollectionName(
276 AdditionalCollectionName {},
277 )),
278 },
279 ),
280 "subject" => ColumnDesc::named_with_additional_column(
281 column_name,
282 column_id,
283 DataType::Varchar, AdditionalColumn {
285 column_type: Some(AdditionalColumnType::Subject(AdditionalSubject {})),
286 },
287 ),
288 "message_id_data" => ColumnDesc::named_with_additional_column(
289 column_name,
290 column_id,
291 DataType::Bytea,
292 AdditionalColumn {
293 column_type: Some(AdditionalColumnType::PulsarMessageIdData(
294 AdditionalColumnPulsarMessageIdData {},
295 )),
296 },
297 ),
298 _ => unreachable!(),
299 };
300
301 Ok(col_desc)
302}
303
304pub fn derive_pulsar_message_id_data_column(
305 connector_name: &str,
306 column_exist: &mut Vec<bool>,
307 additional_columns: &mut Vec<ColumnDesc>,
308) {
309 let max_column_id = additional_columns
312 .iter()
313 .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id));
314
315 column_exist.push(false);
317 additional_columns.push(
318 build_additional_column_desc(
319 max_column_id.next(),
320 connector_name,
321 "message_id_data",
322 None,
323 None,
324 None,
325 false,
326 false,
327 )
328 .unwrap(),
329 );
330}
331
332pub fn source_add_partition_offset_cols(
338 columns: &[ColumnCatalog],
339 connector_name: &str,
340 skip_col_id: bool,
341) -> (Vec<bool>, Vec<ColumnDesc>) {
342 let mut columns_exist = vec![false; 2];
343
344 let mut last_column_id = max_column_id(columns);
345 let mut assign_col_id = || {
346 if skip_col_id {
347 ColumnId::placeholder()
349 } else {
350 last_column_id = last_column_id.next();
351 last_column_id
352 }
353 };
354
355 let additional_columns: Vec<_> = {
356 let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
357 .get(connector_name)
358 .unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
359 ["partition", "file", "offset"]
360 .iter()
361 .filter_map(|col_type| {
362 if compat_col_types.contains(col_type) {
363 Some(
364 build_additional_column_desc(
365 assign_col_id(),
366 connector_name,
367 col_type,
368 None,
369 None,
370 None,
371 false,
372 false,
373 )
374 .unwrap(),
375 )
376 } else {
377 None
378 }
379 })
380 .collect()
381 };
382 assert_eq!(additional_columns.len(), 2);
383 use risingwave_pb::plan_common::additional_column::ColumnType;
384 assert_matches::assert_matches!(
385 additional_columns[0].additional_column,
386 AdditionalColumn {
387 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
388 }
389 );
390 assert_matches::assert_matches!(
391 additional_columns[1].additional_column,
392 AdditionalColumn {
393 column_type: Some(ColumnType::Offset(_)),
394 }
395 );
396
397 for col in columns {
399 match col.column_desc.additional_column {
400 AdditionalColumn {
401 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
402 } => {
403 columns_exist[0] = true;
404 }
405 AdditionalColumn {
406 column_type: Some(ColumnType::Offset(_)),
407 } => {
408 columns_exist[1] = true;
409 }
410 _ => (),
411 }
412 }
413
414 (columns_exist, additional_columns)
415}
416
417fn build_header_catalog(
418 column_id: ColumnId,
419 col_name: &str,
420 inner_field_name: Option<&str>,
421 data_type: Option<&DataType>,
422) -> ColumnDesc {
423 if let Some(inner) = inner_field_name {
424 let data_type = data_type.unwrap_or(&DataType::Bytea);
425 let pb_data_type = data_type.to_protobuf();
426 ColumnDesc::named_with_additional_column(
427 col_name,
428 column_id,
429 data_type.clone(),
430 AdditionalColumn {
431 column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader {
432 inner_field: inner.to_owned(),
433 data_type: Some(pb_data_type),
434 })),
435 },
436 )
437 } else {
438 ColumnDesc::named_with_additional_column(
439 col_name,
440 column_id,
441 DataType::list(get_kafka_header_item_datatype()),
442 AdditionalColumn {
443 column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})),
444 },
445 )
446 }
447}
448
449pub fn get_kafka_header_item_datatype() -> DataType {
450 let struct_inner = vec![("key", DataType::Varchar), ("value", DataType::Bytea)];
451 DataType::Struct(StructType::new(struct_inner))
452}
453
454#[cfg(test)]
455mod test {
456 use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
457
458 use super::*;
459
460 #[test]
461 fn test_gen_default_addition_col_name() {
462 assert_eq!(
463 gen_default_addition_col_name("kafka", "key", None, None),
464 "_rw_kafka_key"
465 );
466 assert_eq!(
467 gen_default_addition_col_name("kafka", "header", Some("inner"), None),
468 "_rw_kafka_header_inner"
469 );
470 assert_eq!(
471 gen_default_addition_col_name(
472 "kafka",
473 "header",
474 Some("inner"),
475 Some(&DataType::Varchar)
476 ),
477 "_rw_kafka_header_inner_varchar"
478 );
479 }
480
481 #[test]
482 fn test_build_pulsar_header_additional_column() {
483 let col = build_additional_column_desc(
484 ColumnId::new(1),
485 PULSAR_CONNECTOR,
486 "header",
487 Some("pulsar_header".to_owned()),
488 Some("tenant"),
489 Some(&DataType::Varchar),
490 true,
491 false,
492 )
493 .unwrap();
494
495 assert_eq!(col.name, "pulsar_header");
496 assert_eq!(col.data_type, DataType::Varchar);
497 assert_matches::assert_matches!(
498 col.additional_column.column_type,
499 Some(AdditionalColumnType::HeaderInner(ref header))
500 if header.inner_field == "tenant"
501 );
502 }
503}