1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::catalog::TableId;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
49use crate::enforce_secret::EnforceSecret;
50use crate::error::ConnectorResult as Result;
51use crate::parser::ParserConfig;
52use crate::parser::schema_change::SchemaChangeEnvelope;
53use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57 WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58 for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59 match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68pub trait TryFromBTreeMap: Sized + UnknownFields {
69 fn try_from_btreemap(
71 props: BTreeMap<String, String>,
72 deny_unknown_fields: bool,
73 ) -> Result<Self>;
74}
75
76pub trait SourceProperties:
80 TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
81{
82 const SOURCE_NAME: &'static str;
83 type Split: SplitMetaData
84 + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
85 + Into<SplitImpl>;
86 type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
87 type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
88
89 fn init_from_pb_source(&mut self, _source: &PbSource) {}
91
92 fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
94}
95
96pub trait UnknownFields {
97 fn unknown_fields(&self) -> HashMap<String, String>;
99}
100
101impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
102 fn try_from_btreemap(
103 props: BTreeMap<String, String>,
104 deny_unknown_fields: bool,
105 ) -> Result<Self> {
106 let json_value = serde_json::to_value(props)?;
107 let res = serde_json::from_value::<P>(json_value)?;
108
109 if !deny_unknown_fields || res.unknown_fields().is_empty() {
110 Ok(res)
111 } else {
112 bail!(
113 "Unknown fields in the WITH clause: {:?}",
114 res.unknown_fields()
115 )
116 }
117 }
118}
119
120#[derive(Default)]
121pub struct CreateSplitReaderOpt {
122 pub support_multiple_splits: bool,
123 pub seek_to_latest: bool,
124}
125
126#[derive(Default)]
127pub struct CreateSplitReaderResult {
128 pub latest_splits: Option<Vec<SplitImpl>>,
129 pub backfill_info: HashMap<SplitId, BackfillInfo>,
130}
131
132pub async fn create_split_readers<P: SourceProperties>(
133 prop: P,
134 splits: Vec<SplitImpl>,
135 parser_config: ParserConfig,
136 source_ctx: SourceContextRef,
137 columns: Option<Vec<Column>>,
138 opt: CreateSplitReaderOpt,
139) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
140 let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
141 let mut res = CreateSplitReaderResult {
142 backfill_info: HashMap::new(),
143 latest_splits: None,
144 };
145 if opt.support_multiple_splits {
146 let mut reader = P::SplitReader::new(
147 prop.clone(),
148 splits,
149 parser_config.clone(),
150 source_ctx.clone(),
151 columns.clone(),
152 )
153 .await?;
154 if opt.seek_to_latest {
155 res.latest_splits = Some(reader.seek_to_latest().await?);
156 }
157 res.backfill_info = reader.backfill_info();
158 Ok((reader.into_stream().boxed(), res))
159 } else {
160 let mut readers = try_join_all(splits.into_iter().map(|split| {
161 P::SplitReader::new(
164 prop.clone(),
165 vec![split],
166 parser_config.clone(),
167 source_ctx.clone(),
168 columns.clone(),
169 )
170 }))
171 .await?;
172 if opt.seek_to_latest {
173 let mut latest_splits = vec![];
174 for reader in &mut readers {
175 latest_splits.extend(reader.seek_to_latest().await?);
176 }
177 res.latest_splits = Some(latest_splits);
178 }
179 res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
180 Ok((
181 select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
182 res,
183 ))
184 }
185}
186
187#[async_trait]
190pub trait SplitEnumerator: Sized + Send {
191 type Split: SplitMetaData + Send;
192 type Properties;
193
194 async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
195 -> Result<Self>;
196 async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
197 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
199 Ok(())
200 }
201 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
203 Ok(())
204 }
205}
206
207pub type SourceContextRef = Arc<SourceContext>;
208pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
209
210#[async_trait]
212pub trait AnySplitEnumerator: Send {
213 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
214 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
215 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
216}
217
218#[async_trait]
219impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
220 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
221 SplitEnumerator::list_splits(self)
222 .await
223 .map(|s| s.into_iter().map(|s| s.into()).collect())
224 }
225
226 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
227 SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
228 }
229
230 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
231 SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
232 }
233}
234
235pub const MAX_CHUNK_SIZE: usize = 1024;
237
238#[derive(Debug, Clone, Copy)]
239pub struct SourceCtrlOpts {
240 pub chunk_size: usize,
242 pub split_txn: bool,
244}
245
246impl !Default for SourceCtrlOpts {}
249
250impl SourceCtrlOpts {
251 #[cfg(test)]
252 pub fn for_test() -> Self {
253 SourceCtrlOpts {
254 chunk_size: 256,
255 split_txn: false,
256 }
257 }
258}
259
260#[derive(Debug)]
261pub struct SourceEnumeratorContext {
262 pub info: SourceEnumeratorInfo,
263 pub metrics: Arc<EnumeratorMetrics>,
264}
265
266impl SourceEnumeratorContext {
267 pub fn dummy() -> SourceEnumeratorContext {
270 SourceEnumeratorContext {
271 info: SourceEnumeratorInfo { source_id: 0 },
272 metrics: Arc::new(EnumeratorMetrics::default()),
273 }
274 }
275}
276
277#[derive(Clone, Debug)]
278pub struct SourceEnumeratorInfo {
279 pub source_id: u32,
280}
281
282#[derive(Debug, Clone)]
283pub struct SourceContext {
284 pub actor_id: u32,
285 pub source_id: TableId,
286 pub fragment_id: u32,
287 pub source_name: String,
288 pub metrics: Arc<SourceMetrics>,
289 pub source_ctrl_opts: SourceCtrlOpts,
290 pub connector_props: ConnectorProperties,
291 pub schema_change_tx:
293 Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
294}
295
296impl SourceContext {
297 pub fn new(
298 actor_id: u32,
299 source_id: TableId,
300 fragment_id: u32,
301 source_name: String,
302 metrics: Arc<SourceMetrics>,
303 source_ctrl_opts: SourceCtrlOpts,
304 connector_props: ConnectorProperties,
305 schema_change_channel: Option<
306 mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
307 >,
308 ) -> Self {
309 Self {
310 actor_id,
311 source_id,
312 fragment_id,
313 source_name,
314 metrics,
315 source_ctrl_opts,
316 connector_props,
317 schema_change_tx: schema_change_channel,
318 }
319 }
320
321 pub fn dummy() -> Self {
324 Self::new(
325 0,
326 TableId::new(0),
327 0,
328 "dummy".to_owned(),
329 Arc::new(SourceMetrics::default()),
330 SourceCtrlOpts {
331 chunk_size: MAX_CHUNK_SIZE,
332 split_txn: false,
333 },
334 ConnectorProperties::default(),
335 None,
336 )
337 }
338}
339
340#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
341pub enum SourceFormat {
342 #[default]
343 Invalid,
344 Native,
345 None,
346 Debezium,
347 DebeziumMongo,
348 Maxwell,
349 Canal,
350 Upsert,
351 Plain,
352}
353
354#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
356pub enum SourceEncode {
357 #[default]
358 Invalid,
359 Native,
360 None,
361 Avro,
362 Csv,
363 Protobuf,
364 Json,
365 Bytes,
366 Parquet,
367}
368
369#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
370pub struct SourceStruct {
371 pub format: SourceFormat,
372 pub encode: SourceEncode,
373}
374
375impl SourceStruct {
376 pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
377 Self { format, encode }
378 }
379}
380
381pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
383 use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
384
385 if let Ok(format) = info.get_row_format() {
387 let (format, encode) = match format {
388 RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
389 RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
390 RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
391 RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
392 RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
393 RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
394 RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
395 RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
396 RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
397 RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
398 RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
399 RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
400 RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
401 RowFormatType::RowUnspecified => unreachable!(),
402 };
403 return Ok(SourceStruct::new(format, encode));
404 }
405 let source_format = info.get_format()?;
406 let source_encode = info.get_row_encode()?;
407 let (format, encode) = match (source_format, source_encode) {
408 (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
409 (PbFormatType::Plain, PbEncodeType::Protobuf) => {
410 (SourceFormat::Plain, SourceEncode::Protobuf)
411 }
412 (PbFormatType::Debezium, PbEncodeType::Json) => {
413 (SourceFormat::Debezium, SourceEncode::Json)
414 }
415 (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
416 (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
417 (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
418 (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
419 (PbFormatType::Plain, PbEncodeType::Parquet) => {
420 (SourceFormat::Plain, SourceEncode::Parquet)
421 }
422 (PbFormatType::Native, PbEncodeType::Native) => {
423 (SourceFormat::Native, SourceEncode::Native)
424 }
425 (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
426 (PbFormatType::Debezium, PbEncodeType::Avro) => {
427 (SourceFormat::Debezium, SourceEncode::Avro)
428 }
429 (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
430 (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
431 (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
432 (SourceFormat::DebeziumMongo, SourceEncode::Json)
433 }
434 (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
435 (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
436 (SourceFormat::Upsert, SourceEncode::Protobuf)
437 }
438 (format, encode) => {
439 bail!(
440 "Unsupported combination of format {:?} and encode {:?}",
441 format,
442 encode
443 );
444 }
445 };
446 Ok(SourceStruct::new(format, encode))
447}
448
449pub type BoxSourceMessageStream =
451 BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
452pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
454pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
455pub type BoxSourceChunkWithStateStream =
456 BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
457
458pub type BoxStreamingFileSourceChunkStream =
460 BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
461
462pub trait SourceChunkStream:
464 Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
465{
466}
467impl<T> SourceChunkStream for T where
468 T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
469{
470}
471
472pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
473
474#[async_trait]
478pub trait SplitReader: Sized + Send {
479 type Properties;
480 type Split: SplitMetaData;
481
482 async fn new(
483 properties: Self::Properties,
484 state: Vec<Self::Split>,
485 parser_config: ParserConfig,
486 source_ctx: SourceContextRef,
487 columns: Option<Vec<Column>>,
488 ) -> crate::error::ConnectorResult<Self>;
489
490 fn into_stream(self) -> BoxSourceChunkStream;
491
492 fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
493 HashMap::new()
494 }
495
496 async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
497 Err(anyhow!("seek_to_latest is not supported for this connector").into())
498 }
499}
500
501#[derive(Debug, Clone)]
506pub enum BackfillInfo {
507 HasDataToBackfill {
508 latest_offset: String,
516 },
517 NoDataToBackfill,
522}
523
524for_all_sources!(impl_connector_properties);
525
526impl Default for ConnectorProperties {
527 fn default() -> Self {
528 ConnectorProperties::Test(Box::default())
529 }
530}
531
532impl ConnectorProperties {
533 pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
534 with_properties
535 .get(UPSTREAM_SOURCE_KEY)
536 .map(|s| {
537 s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
538 || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
539 || s.eq_ignore_ascii_case(GCS_CONNECTOR)
540 || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
541 })
542 .unwrap_or(false)
543 }
544}
545
546impl ConnectorProperties {
547 pub fn extract(
554 with_properties: WithOptionsSecResolved,
555 deny_unknown_fields: bool,
556 ) -> Result<Self> {
557 let (options, secret_refs) = with_properties.into_parts();
558 let mut options_with_secret =
559 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
560 let connector = options_with_secret
561 .remove(UPSTREAM_SOURCE_KEY)
562 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
563 .to_lowercase();
564 match_source_name_str!(
565 connector.as_str(),
566 PropType,
567 PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
568 .map(ConnectorProperties::from),
569 |other| bail!("connector '{}' is not supported", other)
570 )
571 }
572
573 pub fn enforce_secret_source(
574 with_properties: &impl WithPropertiesExt,
575 ) -> crate::error::ConnectorResult<()> {
576 let connector = with_properties
577 .get_connector()
578 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
579 .to_lowercase();
580 let key_iter = with_properties.key_iter();
581 match_source_name_str!(
582 connector.as_str(),
583 PropType,
584 PropType::enforce_secret(key_iter),
585 |other| bail!("connector '{}' is not supported", other)
586 )
587 }
588
589 pub fn enable_drop_split(&self) -> bool {
590 matches!(
592 self,
593 ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
594 )
595 }
596
597 pub fn enable_adaptive_splits(&self) -> bool {
599 matches!(self, ConnectorProperties::Nats(_))
600 }
601
602 pub fn init_from_pb_source(&mut self, source: &PbSource) {
604 dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
605 }
606
607 pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
609 dispatch_source_prop!(self, |prop| prop
610 .init_from_pb_cdc_table_desc(cdc_table_desc))
611 }
612
613 pub fn support_multiple_splits(&self) -> bool {
614 matches!(self, ConnectorProperties::Kafka(_))
615 || matches!(self, ConnectorProperties::OpendalS3(_))
616 || matches!(self, ConnectorProperties::Gcs(_))
617 || matches!(self, ConnectorProperties::Azblob(_))
618 }
619
620 pub async fn create_split_enumerator(
621 self,
622 context: crate::source::base::SourceEnumeratorContextRef,
623 ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
624 let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
625 <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
626 ));
627 Ok(enumerator)
628 }
629
630 pub async fn create_split_reader(
631 self,
632 splits: Vec<SplitImpl>,
633 parser_config: ParserConfig,
634 source_ctx: SourceContextRef,
635 columns: Option<Vec<Column>>,
636 mut opt: crate::source::CreateSplitReaderOpt,
637 ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
638 opt.support_multiple_splits = self.support_multiple_splits();
639 tracing::debug!(
640 ?splits,
641 support_multiple_splits = opt.support_multiple_splits,
642 "spawning connector split reader",
643 );
644
645 dispatch_source_prop!(self, |prop| create_split_readers(
646 *prop,
647 splits,
648 parser_config,
649 source_ctx,
650 columns,
651 opt
652 )
653 .await)
654 }
655}
656
657for_all_sources!(impl_split);
658for_all_connections!(impl_connection);
659
660impl From<&SplitImpl> for ConnectorSplit {
661 fn from(split: &SplitImpl) -> Self {
662 dispatch_split_impl!(split, |inner| {
663 ConnectorSplit {
664 split_type: String::from(PropType::SOURCE_NAME),
665 encoded_split: inner.encode_to_bytes().to_vec(),
666 }
667 })
668 }
669}
670
671impl TryFrom<&ConnectorSplit> for SplitImpl {
672 type Error = crate::error::ConnectorError;
673
674 fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
675 let split_type = split.split_type.to_lowercase();
676 match_source_name_str!(
677 split_type.as_str(),
678 PropType,
679 {
680 <PropType as SourceProperties>::Split::restore_from_bytes(
681 split.encoded_split.as_ref(),
682 )
683 .map(Into::into)
684 },
685 |other| bail!("connector '{}' is not supported", other)
686 )
687 }
688}
689
690impl SplitImpl {
691 fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
692 let split_type = split_type.to_lowercase();
693 match_source_name_str!(
694 split_type.as_str(),
695 PropType,
696 <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
697 |other| bail!("connector '{}' is not supported", other)
698 )
699 }
700
701 pub fn is_cdc_split(&self) -> bool {
702 matches!(
703 self,
704 MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
705 )
706 }
707
708 pub fn get_cdc_split_offset(&self) -> String {
710 match self {
711 MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
712 PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
713 MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
714 CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
715 SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
716 _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
717 }
718 }
719}
720
721impl SplitMetaData for SplitImpl {
722 fn id(&self) -> SplitId {
723 dispatch_split_impl!(self, |inner| inner.id())
724 }
725
726 fn encode_to_json(&self) -> JsonbVal {
727 use serde_json::json;
728 let inner = self.encode_to_json_inner().take();
729 json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
730 }
731
732 fn restore_from_json(value: JsonbVal) -> Result<Self> {
733 let mut value = value.take();
734 let json_obj = value.as_object_mut().unwrap();
735 let split_type = json_obj
736 .remove(SPLIT_TYPE_FIELD)
737 .unwrap()
738 .as_str()
739 .unwrap()
740 .to_owned();
741 let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
742 Self::restore_from_json_inner(&split_type, inner_value.into())
743 }
744
745 fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
746 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
747 }
748}
749
750impl SplitImpl {
751 pub fn get_type(&self) -> String {
752 dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
753 }
754
755 pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
756 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
757 Ok(())
758 }
759
760 pub fn encode_to_json_inner(&self) -> JsonbVal {
761 dispatch_split_impl!(self, |inner| inner.encode_to_json())
762 }
763}
764
765use risingwave_common::types::DataType;
766
767#[derive(Clone, Debug)]
768pub struct Column {
769 pub name: String,
770 pub data_type: DataType,
771 pub is_visible: bool,
773}
774
775pub type SplitId = Arc<str>;
777
778#[derive(Debug, Clone)]
781pub struct SourceMessage {
782 pub key: Option<Vec<u8>>,
783 pub payload: Option<Vec<u8>>,
784 pub offset: String, pub split_id: SplitId,
786 pub meta: SourceMeta,
787}
788
789impl SourceMessage {
790 pub fn dummy() -> Self {
792 Self {
793 key: None,
794 payload: None,
795 offset: "".to_owned(),
796 split_id: "".into(),
797 meta: SourceMeta::Empty,
798 }
799 }
800
801 pub fn is_cdc_heartbeat(&self) -> bool {
803 self.key.is_none() && self.payload.is_none()
804 }
805}
806
807#[derive(Debug, Clone)]
808pub enum SourceMeta {
809 Kafka(KafkaMeta),
810 Kinesis(KinesisMeta),
811 Pulsar(PulsarMeta),
812 Nexmark(NexmarkMeta),
813 GooglePubsub(GooglePubsubMeta),
814 Datagen(DatagenMeta),
815 DebeziumCdc(DebeziumCdcMeta),
816 Nats(NatsMeta),
817 Empty,
819}
820
821impl PartialEq for SourceMessage {
823 fn eq(&self, other: &Self) -> bool {
824 self.offset == other.offset
825 && self.split_id == other.split_id
826 && self.payload == other.payload
827 }
828}
829impl Eq for SourceMessage {}
830
831pub trait SplitMetaData: Sized {
833 fn id(&self) -> SplitId;
834 fn encode_to_bytes(&self) -> Bytes {
835 self.encode_to_json()
836 .as_scalar_ref()
837 .value_serialize()
838 .into()
839 }
840 fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
841 Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
842 }
843
844 fn encode_to_json(&self) -> JsonbVal;
846 fn restore_from_json(value: JsonbVal) -> Result<Self>;
847 fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
848}
849
850pub type ConnectorState = Option<Vec<SplitImpl>>;
855
856#[cfg(test)]
857mod tests {
858 use maplit::*;
859 use nexmark::event::EventType;
860
861 use super::*;
862 use crate::source::cdc::{DebeziumCdcSplit, Mysql};
863 use crate::source::kafka::KafkaSplit;
864
865 #[test]
866 fn test_split_impl_get_fn() -> Result<()> {
867 let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
868 let split_impl = SplitImpl::Kafka(split.clone());
869 let get_value = split_impl.into_kafka().unwrap();
870 println!("{:?}", get_value);
871 assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
872 assert_eq!(split.encode_to_json(), get_value.encode_to_json());
873
874 Ok(())
875 }
876
877 #[test]
878 fn test_cdc_split_state() -> Result<()> {
879 let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
880 let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
881 let split_impl = SplitImpl::MysqlCdc(split);
882 let encoded_split = split_impl.encode_to_bytes();
883 let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
884 assert_eq!(
885 split_impl.encode_to_bytes(),
886 restored_split_impl.encode_to_bytes()
887 );
888 assert_eq!(
889 split_impl.encode_to_json(),
890 restored_split_impl.encode_to_json()
891 );
892
893 let encoded_split = split_impl.encode_to_json();
894 let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
895 assert_eq!(
896 split_impl.encode_to_bytes(),
897 restored_split_impl.encode_to_bytes()
898 );
899 assert_eq!(
900 split_impl.encode_to_json(),
901 restored_split_impl.encode_to_json()
902 );
903 Ok(())
904 }
905
906 #[test]
907 fn test_extract_nexmark_config() {
908 let props = convert_args!(btreemap!(
909 "connector" => "nexmark",
910 "nexmark.table.type" => "Person",
911 "nexmark.split.num" => "1",
912 ));
913
914 let props =
915 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
916 .unwrap();
917
918 if let ConnectorProperties::Nexmark(props) = props {
919 assert_eq!(props.table_type, Some(EventType::Person));
920 assert_eq!(props.split_num, 1);
921 } else {
922 panic!("extract nexmark config failed");
923 }
924 }
925
926 #[test]
927 fn test_extract_kafka_config() {
928 let props = convert_args!(btreemap!(
929 "connector" => "kafka",
930 "properties.bootstrap.server" => "b1,b2",
931 "topic" => "test",
932 "scan.startup.mode" => "earliest",
933 "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
934 ));
935
936 let props =
937 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
938 .unwrap();
939 if let ConnectorProperties::Kafka(k) = props {
940 let btreemap = btreemap! {
941 "b-1:9092".to_owned() => "dns-1".to_owned(),
942 "b-2:9092".to_owned() => "dns-2".to_owned(),
943 };
944 assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
945 } else {
946 panic!("extract kafka config failed");
947 }
948 }
949
950 #[test]
951 fn test_extract_cdc_properties() {
952 let user_props_mysql = convert_args!(btreemap!(
953 "connector" => "mysql-cdc",
954 "database.hostname" => "127.0.0.1",
955 "database.port" => "3306",
956 "database.user" => "root",
957 "database.password" => "123456",
958 "database.name" => "mydb",
959 "table.name" => "products",
960 ));
961
962 let user_props_postgres = convert_args!(btreemap!(
963 "connector" => "postgres-cdc",
964 "database.hostname" => "127.0.0.1",
965 "database.port" => "5432",
966 "database.user" => "root",
967 "database.password" => "654321",
968 "schema.name" => "public",
969 "database.name" => "mypgdb",
970 "table.name" => "orders",
971 ));
972
973 let conn_props = ConnectorProperties::extract(
974 WithOptionsSecResolved::without_secrets(user_props_mysql),
975 true,
976 )
977 .unwrap();
978 if let ConnectorProperties::MysqlCdc(c) = conn_props {
979 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
980 assert_eq!(c.properties.get("database.port").unwrap(), "3306");
981 assert_eq!(c.properties.get("database.user").unwrap(), "root");
982 assert_eq!(c.properties.get("database.password").unwrap(), "123456");
983 assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
984 assert_eq!(c.properties.get("table.name").unwrap(), "products");
985 } else {
986 panic!("extract cdc config failed");
987 }
988
989 let conn_props = ConnectorProperties::extract(
990 WithOptionsSecResolved::without_secrets(user_props_postgres),
991 true,
992 )
993 .unwrap();
994 if let ConnectorProperties::PostgresCdc(c) = conn_props {
995 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
996 assert_eq!(c.properties.get("database.port").unwrap(), "5432");
997 assert_eq!(c.properties.get("database.user").unwrap(), "root");
998 assert_eq!(c.properties.get("database.password").unwrap(), "654321");
999 assert_eq!(c.properties.get("schema.name").unwrap(), "public");
1000 assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1001 assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1002 } else {
1003 panic!("extract cdc config failed");
1004 }
1005 }
1006}