1use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, max_column_id};
20use risingwave_common::types::{DataType, StructType};
21use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
22use risingwave_pb::plan_common::{
23 AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
24 AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
25 AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnPulsarMessageIdData,
26 AdditionalColumnTimestamp, AdditionalDatabaseName, AdditionalSchemaName, AdditionalSubject,
27 AdditionalTableName,
28};
29
30use crate::error::ConnectorResult;
31use crate::source::cdc::MONGODB_CDC_CONNECTOR;
32use crate::source::{
33 AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR,
34 NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
35};
36
37pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
39 LazyLock::new(|| HashSet::from(["partition", "offset"]));
40
41pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet<&'static str>>> =
42 LazyLock::new(|| {
43 HashMap::from([
44 (
45 KAFKA_CONNECTOR,
46 HashSet::from([
47 "key",
48 "timestamp",
49 "partition",
50 "offset",
51 "header",
52 "payload",
53 ]),
54 ),
55 (
56 PULSAR_CONNECTOR,
57 HashSet::from(["key", "partition", "offset", "payload", "message_id_data"]),
58 ),
59 (
60 KINESIS_CONNECTOR,
61 HashSet::from(["key", "partition", "offset", "timestamp", "payload"]),
62 ),
63 (
64 NATS_CONNECTOR,
65 HashSet::from(["partition", "offset", "payload", "subject"]),
66 ),
67 (
68 OPENDAL_S3_CONNECTOR,
69 HashSet::from(["file", "offset", "payload"]),
70 ),
71 (GCS_CONNECTOR, HashSet::from(["file", "offset", "payload"])),
72 (
73 AZBLOB_CONNECTOR,
74 HashSet::from(["file", "offset", "payload"]),
75 ),
76 (
77 POSIX_FS_CONNECTOR,
78 HashSet::from(["file", "offset", "payload"]),
79 ),
80 (
82 MONGODB_CDC_CONNECTOR,
83 HashSet::from([
84 "timestamp",
85 "partition",
86 "offset",
87 "database_name",
88 "collection_name",
89 ]),
90 ),
91 (MQTT_CONNECTOR, HashSet::from(["offset", "partition"])),
92 ])
93 });
94
95pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
97 LazyLock::new(|| {
98 Some(HashSet::from([
99 "timestamp",
100 "database_name",
101 "schema_name",
102 "table_name",
103 ]))
104 });
105
106pub fn get_supported_additional_columns(
107 connector_name: &str,
108 is_cdc_backfill: bool,
109) -> Option<&HashSet<&'static str>> {
110 if is_cdc_backfill {
111 CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref()
112 } else {
113 COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name)
114 }
115}
116
117pub fn gen_default_addition_col_name(
118 connector_name: &str,
119 additional_col_type: &str,
120 inner_field_name: Option<&str>,
121 data_type: Option<&DataType>,
122) -> String {
123 let legacy_dt_name = data_type.map(|dt| format!("{:?}", dt).to_lowercase());
124 let col_name = [
125 Some(connector_name),
126 Some(additional_col_type),
127 inner_field_name,
128 legacy_dt_name.as_deref(),
129 ];
130 col_name.iter().fold("_rw".to_owned(), |name, ele| {
131 if let Some(ele) = ele {
132 format!("{}_{}", name, ele)
133 } else {
134 name
135 }
136 })
137}
138
139pub fn build_additional_column_desc(
140 column_id: ColumnId,
141 connector_name: &str,
142 additional_col_type: &str,
143 column_alias: Option<String>,
144 inner_field_name: Option<&str>,
145 data_type: Option<&DataType>,
146 reject_unknown_connector: bool,
147 is_cdc_backfill_table: bool,
148) -> ConnectorResult<ColumnDesc> {
149 let compatible_columns = match (
150 get_supported_additional_columns(connector_name, is_cdc_backfill_table),
151 reject_unknown_connector,
152 ) {
153 (Some(compat_cols), _) => compat_cols,
154 (None, false) => &COMMON_COMPATIBLE_ADDITIONAL_COLUMNS,
155 (None, true) => {
156 bail!(
157 "additional column is not supported for connector {}, acceptable connectors: {:?}",
158 connector_name,
159 COMPATIBLE_ADDITIONAL_COLUMNS.keys(),
160 );
161 }
162 };
163 if !compatible_columns.contains(additional_col_type) {
164 bail!(
165 "additional column type {} is not supported for connector {}, acceptable column types: {:?}",
166 additional_col_type,
167 connector_name,
168 compatible_columns
169 );
170 }
171
172 let column_name = column_alias.unwrap_or_else(|| {
173 gen_default_addition_col_name(
174 connector_name,
175 additional_col_type,
176 inner_field_name,
177 data_type,
178 )
179 });
180
181 let col_desc = match additional_col_type {
182 "key" => ColumnDesc::named_with_additional_column(
183 column_name,
184 column_id,
185 DataType::Bytea,
186 AdditionalColumn {
187 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
188 },
189 ),
190
191 "timestamp" => ColumnDesc::named_with_additional_column(
192 column_name,
193 column_id,
194 DataType::Timestamptz,
195 AdditionalColumn {
196 column_type: Some(AdditionalColumnType::Timestamp(
197 AdditionalColumnTimestamp {},
198 )),
199 },
200 ),
201 "partition" => ColumnDesc::named_with_additional_column(
202 column_name,
203 column_id,
204 DataType::Varchar,
205 AdditionalColumn {
206 column_type: Some(AdditionalColumnType::Partition(
207 AdditionalColumnPartition {},
208 )),
209 },
210 ),
211 "payload" => ColumnDesc::named_with_additional_column(
212 column_name,
213 column_id,
214 DataType::Jsonb,
215 AdditionalColumn {
216 column_type: Some(AdditionalColumnType::Payload(AdditionalColumnPayload {})),
217 },
218 ),
219 "offset" => ColumnDesc::named_with_additional_column(
220 column_name,
221 column_id,
222 DataType::Varchar,
223 AdditionalColumn {
224 column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})),
225 },
226 ),
227
228 "file" => ColumnDesc::named_with_additional_column(
229 column_name,
230 column_id,
231 DataType::Varchar,
232 AdditionalColumn {
233 column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})),
234 },
235 ),
236 "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type),
237 "database_name" => ColumnDesc::named_with_additional_column(
238 column_name,
239 column_id,
240 DataType::Varchar,
241 AdditionalColumn {
242 column_type: Some(AdditionalColumnType::DatabaseName(
243 AdditionalDatabaseName {},
244 )),
245 },
246 ),
247 "schema_name" => ColumnDesc::named_with_additional_column(
248 column_name,
249 column_id,
250 DataType::Varchar,
251 AdditionalColumn {
252 column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})),
253 },
254 ),
255 "table_name" => ColumnDesc::named_with_additional_column(
256 column_name,
257 column_id,
258 DataType::Varchar,
259 AdditionalColumn {
260 column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
261 },
262 ),
263 "collection_name" => ColumnDesc::named_with_additional_column(
264 column_name,
265 column_id,
266 DataType::Varchar,
267 AdditionalColumn {
268 column_type: Some(AdditionalColumnType::CollectionName(
269 AdditionalCollectionName {},
270 )),
271 },
272 ),
273 "subject" => ColumnDesc::named_with_additional_column(
274 column_name,
275 column_id,
276 DataType::Varchar, AdditionalColumn {
278 column_type: Some(AdditionalColumnType::Subject(AdditionalSubject {})),
279 },
280 ),
281 "message_id_data" => ColumnDesc::named_with_additional_column(
282 column_name,
283 column_id,
284 DataType::Bytea,
285 AdditionalColumn {
286 column_type: Some(AdditionalColumnType::PulsarMessageIdData(
287 AdditionalColumnPulsarMessageIdData {},
288 )),
289 },
290 ),
291 _ => unreachable!(),
292 };
293
294 Ok(col_desc)
295}
296
297pub fn derive_pulsar_message_id_data_column(
298 connector_name: &str,
299 column_exist: &mut Vec<bool>,
300 additional_columns: &mut Vec<ColumnDesc>,
301) {
302 let max_column_id = additional_columns
305 .iter()
306 .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id));
307
308 column_exist.push(false);
310 additional_columns.push(
311 build_additional_column_desc(
312 max_column_id.next(),
313 connector_name,
314 "message_id_data",
315 None,
316 None,
317 None,
318 false,
319 false,
320 )
321 .unwrap(),
322 );
323}
324
325pub fn source_add_partition_offset_cols(
331 columns: &[ColumnCatalog],
332 connector_name: &str,
333 skip_col_id: bool,
334) -> (Vec<bool>, Vec<ColumnDesc>) {
335 let mut columns_exist = vec![false; 2];
336
337 let mut last_column_id = max_column_id(columns);
338 let mut assign_col_id = || {
339 if skip_col_id {
340 ColumnId::placeholder()
342 } else {
343 last_column_id = last_column_id.next();
344 last_column_id
345 }
346 };
347
348 let additional_columns: Vec<_> = {
349 let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
350 .get(connector_name)
351 .unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
352 ["partition", "file", "offset"]
353 .iter()
354 .filter_map(|col_type| {
355 if compat_col_types.contains(col_type) {
356 Some(
357 build_additional_column_desc(
358 assign_col_id(),
359 connector_name,
360 col_type,
361 None,
362 None,
363 None,
364 false,
365 false,
366 )
367 .unwrap(),
368 )
369 } else {
370 None
371 }
372 })
373 .collect()
374 };
375 assert_eq!(additional_columns.len(), 2);
376 use risingwave_pb::plan_common::additional_column::ColumnType;
377 assert_matches::assert_matches!(
378 additional_columns[0].additional_column,
379 AdditionalColumn {
380 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
381 }
382 );
383 assert_matches::assert_matches!(
384 additional_columns[1].additional_column,
385 AdditionalColumn {
386 column_type: Some(ColumnType::Offset(_)),
387 }
388 );
389
390 for col in columns {
392 match col.column_desc.additional_column {
393 AdditionalColumn {
394 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
395 } => {
396 columns_exist[0] = true;
397 }
398 AdditionalColumn {
399 column_type: Some(ColumnType::Offset(_)),
400 } => {
401 columns_exist[1] = true;
402 }
403 _ => (),
404 }
405 }
406
407 (columns_exist, additional_columns)
408}
409
410fn build_header_catalog(
411 column_id: ColumnId,
412 col_name: &str,
413 inner_field_name: Option<&str>,
414 data_type: Option<&DataType>,
415) -> ColumnDesc {
416 if let Some(inner) = inner_field_name {
417 let data_type = data_type.unwrap_or(&DataType::Bytea);
418 let pb_data_type = data_type.to_protobuf();
419 ColumnDesc::named_with_additional_column(
420 col_name,
421 column_id,
422 data_type.clone(),
423 AdditionalColumn {
424 column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader {
425 inner_field: inner.to_owned(),
426 data_type: Some(pb_data_type),
427 })),
428 },
429 )
430 } else {
431 ColumnDesc::named_with_additional_column(
432 col_name,
433 column_id,
434 DataType::list(get_kafka_header_item_datatype()),
435 AdditionalColumn {
436 column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})),
437 },
438 )
439 }
440}
441
442pub fn get_kafka_header_item_datatype() -> DataType {
443 let struct_inner = vec![("key", DataType::Varchar), ("value", DataType::Bytea)];
444 DataType::Struct(StructType::new(struct_inner))
445}
446
447#[cfg(test)]
448mod test {
449 use super::*;
450
451 #[test]
452 fn test_gen_default_addition_col_name() {
453 assert_eq!(
454 gen_default_addition_col_name("kafka", "key", None, None),
455 "_rw_kafka_key"
456 );
457 assert_eq!(
458 gen_default_addition_col_name("kafka", "header", Some("inner"), None),
459 "_rw_kafka_header_inner"
460 );
461 assert_eq!(
462 gen_default_addition_col_name(
463 "kafka",
464 "header",
465 Some("inner"),
466 Some(&DataType::Varchar)
467 ),
468 "_rw_kafka_header_inner_varchar"
469 );
470 }
471}