1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::id::{ActorId, FragmentId, SourceId};
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use crate::enforce_secret::EnforceSecret;
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::batch::BatchSourceSplitImpl;
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57 WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58 for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59 match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68#[derive(Clone)]
71pub struct CdcAutoSchemaChangeFailCallback(
72 Arc<dyn Fn(SourceId, String, String, String, String) + Send + Sync>,
73);
74
75impl CdcAutoSchemaChangeFailCallback {
76 pub fn new<F>(f: F) -> Self
77 where
78 F: Fn(SourceId, String, String, String, String) + Send + Sync + 'static,
79 {
80 Self(Arc::new(f))
81 }
82
83 pub fn call(
84 &self,
85 source_id: SourceId,
86 table_name: String,
87 cdc_table_id: String,
88 upstream_ddl: String,
89 fail_info: String,
90 ) {
91 self.0(source_id, table_name, cdc_table_id, upstream_ddl, fail_info);
92 }
93}
94
95impl std::fmt::Debug for CdcAutoSchemaChangeFailCallback {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.write_str("CdcAutoSchemaChangeFailCallback")
98 }
99}
100pub trait TryFromBTreeMap: Sized + UnknownFields {
101 fn try_from_btreemap(
103 props: BTreeMap<String, String>,
104 deny_unknown_fields: bool,
105 ) -> Result<Self>;
106}
107
108pub trait SourceProperties:
112 TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
113{
114 const SOURCE_NAME: &'static str;
115 type Split: SplitMetaData
116 + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
117 + Into<SplitImpl>;
118 type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
119 type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
120
121 fn init_from_pb_source(&mut self, _source: &PbSource) {}
123
124 fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
126}
127
128pub trait UnknownFields {
129 fn unknown_fields(&self) -> HashMap<String, String>;
131}
132
133impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
134 fn try_from_btreemap(
135 props: BTreeMap<String, String>,
136 deny_unknown_fields: bool,
137 ) -> Result<Self> {
138 let json_value = serde_json::to_value(props)?;
139 let res = serde_json::from_value::<P>(json_value)?;
140
141 if !deny_unknown_fields || res.unknown_fields().is_empty() {
142 Ok(res)
143 } else {
144 bail!(
145 "Unknown fields in the WITH clause: {:?}",
146 res.unknown_fields()
147 )
148 }
149 }
150}
151
152#[derive(Default)]
153pub struct CreateSplitReaderOpt {
154 pub support_multiple_splits: bool,
155 pub seek_to_latest: bool,
156}
157
158#[derive(Default)]
159pub struct CreateSplitReaderResult {
160 pub latest_splits: Option<Vec<SplitImpl>>,
161 pub backfill_info: HashMap<SplitId, BackfillInfo>,
162}
163
164pub async fn create_split_readers<P: SourceProperties>(
165 prop: P,
166 splits: Vec<SplitImpl>,
167 parser_config: ParserConfig,
168 source_ctx: SourceContextRef,
169 columns: Option<Vec<Column>>,
170 opt: CreateSplitReaderOpt,
171) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
172 let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
173 let mut res = CreateSplitReaderResult {
174 backfill_info: HashMap::new(),
175 latest_splits: None,
176 };
177 if opt.support_multiple_splits {
178 let mut reader = P::SplitReader::new(
179 prop.clone(),
180 splits,
181 parser_config.clone(),
182 source_ctx.clone(),
183 columns.clone(),
184 )
185 .await?;
186 if opt.seek_to_latest {
187 res.latest_splits = Some(reader.seek_to_latest().await?);
188 }
189 res.backfill_info = reader.backfill_info();
190 Ok((reader.into_stream().boxed(), res))
191 } else {
192 let mut readers = try_join_all(splits.into_iter().map(|split| {
193 P::SplitReader::new(
196 prop.clone(),
197 vec![split],
198 parser_config.clone(),
199 source_ctx.clone(),
200 columns.clone(),
201 )
202 }))
203 .await?;
204 if opt.seek_to_latest {
205 let mut latest_splits = vec![];
206 for reader in &mut readers {
207 latest_splits.extend(reader.seek_to_latest().await?);
208 }
209 res.latest_splits = Some(latest_splits);
210 }
211 res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
212 Ok((
213 select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
214 res,
215 ))
216 }
217}
218
219#[async_trait]
222pub trait SplitEnumerator: Sized + Send {
223 type Split: SplitMetaData + Send;
224 type Properties;
225
226 async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
227 -> Result<Self>;
228 async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
229 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<FragmentId>) -> Result<()> {
231 Ok(())
232 }
233 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<FragmentId>) -> Result<()> {
235 Ok(())
236 }
237 async fn on_tick(&mut self) -> Result<()> {
241 Ok(())
242 }
243}
244
245pub type SourceContextRef = Arc<SourceContext>;
246pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
247
248#[async_trait]
250pub trait AnySplitEnumerator: Send {
251 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
252 async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()>;
253 async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()>;
254 async fn on_tick(&mut self) -> Result<()>;
255}
256
257#[async_trait]
258impl<T: SplitEnumerator<Split: Into<SplitImpl>> + 'static> AnySplitEnumerator for T {
259 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
260 SplitEnumerator::list_splits(self)
261 .await
262 .map(|s| s.into_iter().map(|s| s.into()).collect())
263 }
264
265 async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()> {
266 SplitEnumerator::on_drop_fragments(self, fragment_ids).await
267 }
268
269 async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()> {
270 SplitEnumerator::on_finish_backfill(self, fragment_ids).await
271 }
272
273 async fn on_tick(&mut self) -> Result<()> {
274 SplitEnumerator::on_tick(self).await
275 }
276}
277
278pub const MAX_CHUNK_SIZE: usize = 1024;
280
281#[derive(Debug, Clone, Copy)]
282pub struct SourceCtrlOpts {
283 pub chunk_size: usize,
285 pub split_txn: bool,
287}
288
289impl !Default for SourceCtrlOpts {}
292
293impl SourceCtrlOpts {
294 #[cfg(test)]
295 pub fn for_test() -> Self {
296 SourceCtrlOpts {
297 chunk_size: 256,
298 split_txn: false,
299 }
300 }
301}
302
303#[derive(Debug)]
304pub struct SourceEnumeratorContext {
305 pub info: SourceEnumeratorInfo,
306 pub metrics: Arc<EnumeratorMetrics>,
307}
308
309impl SourceEnumeratorContext {
310 pub fn dummy() -> SourceEnumeratorContext {
313 SourceEnumeratorContext {
314 info: SourceEnumeratorInfo {
315 source_id: 0.into(),
316 },
317 metrics: Arc::new(EnumeratorMetrics::default()),
318 }
319 }
320}
321
322#[derive(Clone, Debug)]
323pub struct SourceEnumeratorInfo {
324 pub source_id: SourceId,
325}
326
327#[derive(Clone, Debug)]
328pub struct SourceContext {
329 pub actor_id: ActorId,
330 pub source_id: SourceId,
331 pub fragment_id: FragmentId,
332 pub source_name: String,
333 pub metrics: Arc<SourceMetrics>,
334 pub source_ctrl_opts: SourceCtrlOpts,
335 pub connector_props: ConnectorProperties,
336 pub schema_change_tx:
338 Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
339 pub on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
341}
342
343impl SourceContext {
344 pub fn new(
345 actor_id: ActorId,
346 source_id: SourceId,
347 fragment_id: FragmentId,
348 source_name: String,
349 metrics: Arc<SourceMetrics>,
350 source_ctrl_opts: SourceCtrlOpts,
351 connector_props: ConnectorProperties,
352 schema_change_channel: Option<
353 mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
354 >,
355 ) -> Self {
356 Self::new_with_auto_schema_change_callback(
357 actor_id,
358 source_id,
359 fragment_id,
360 source_name,
361 metrics,
362 source_ctrl_opts,
363 connector_props,
364 schema_change_channel,
365 None,
366 )
367 }
368
369 pub fn new_with_auto_schema_change_callback(
370 actor_id: ActorId,
371 source_id: SourceId,
372 fragment_id: FragmentId,
373 source_name: String,
374 metrics: Arc<SourceMetrics>,
375 source_ctrl_opts: SourceCtrlOpts,
376 connector_props: ConnectorProperties,
377 schema_change_channel: Option<
378 mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
379 >,
380 on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
381 ) -> Self {
382 Self {
383 actor_id,
384 source_id,
385 fragment_id,
386 source_name,
387 metrics,
388 source_ctrl_opts,
389 connector_props,
390 schema_change_tx: schema_change_channel,
391 on_cdc_auto_schema_change_failure,
392 }
393 }
394
395 pub fn dummy() -> Self {
398 Self::new(
399 0.into(),
400 SourceId::new(0),
401 0.into(),
402 "dummy".to_owned(),
403 Arc::new(SourceMetrics::default()),
404 SourceCtrlOpts {
405 chunk_size: MAX_CHUNK_SIZE,
406 split_txn: false,
407 },
408 ConnectorProperties::default(),
409 None,
410 )
411 }
412
413 pub fn on_cdc_auto_schema_change_failure(
416 &self,
417 source_id: SourceId,
418 table_name: String,
419 cdc_table_id: String,
420 upstream_ddl: String,
421 fail_info: String,
422 ) {
423 if let Some(ref cdc_auto_schema_change_fail_callback) =
424 self.on_cdc_auto_schema_change_failure
425 {
426 cdc_auto_schema_change_fail_callback.call(
427 source_id,
428 table_name,
429 cdc_table_id,
430 upstream_ddl,
431 fail_info,
432 );
433 }
434 }
435}
436
437#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
438pub enum SourceFormat {
439 #[default]
440 Invalid,
441 Native,
442 None,
443 Debezium,
444 DebeziumMongo,
445 Maxwell,
446 Canal,
447 Upsert,
448 Plain,
449}
450
451#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
453pub enum SourceEncode {
454 #[default]
455 Invalid,
456 Native,
457 None,
458 Avro,
459 Csv,
460 Protobuf,
461 Json,
462 Bytes,
463 Parquet,
464}
465
466#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
467pub struct SourceStruct {
468 pub format: SourceFormat,
469 pub encode: SourceEncode,
470}
471
472impl SourceStruct {
473 pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
474 Self { format, encode }
475 }
476}
477
478pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
480 use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
481
482 if let Ok(format) = info.get_row_format() {
484 let (format, encode) = match format {
485 RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
486 RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
487 RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
488 RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
489 RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
490 RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
491 RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
492 RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
493 RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
494 RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
495 RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
496 RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
497 RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
498 RowFormatType::RowUnspecified => unreachable!(),
499 };
500 return Ok(SourceStruct::new(format, encode));
501 }
502 let source_format = info.get_format()?;
503 let source_encode = info.get_row_encode()?;
504 let (format, encode) = match (source_format, source_encode) {
505 (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
506 (PbFormatType::Plain, PbEncodeType::Protobuf) => {
507 (SourceFormat::Plain, SourceEncode::Protobuf)
508 }
509 (PbFormatType::Debezium, PbEncodeType::Json) => {
510 (SourceFormat::Debezium, SourceEncode::Json)
511 }
512 (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
513 (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
514 (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
515 (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
516 (PbFormatType::Plain, PbEncodeType::Parquet) => {
517 (SourceFormat::Plain, SourceEncode::Parquet)
518 }
519 (PbFormatType::Native, PbEncodeType::Native) => {
520 (SourceFormat::Native, SourceEncode::Native)
521 }
522 (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
523 (PbFormatType::Debezium, PbEncodeType::Avro) => {
524 (SourceFormat::Debezium, SourceEncode::Avro)
525 }
526 (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
527 (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
528 (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
529 (SourceFormat::DebeziumMongo, SourceEncode::Json)
530 }
531 (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
532 (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
533 (SourceFormat::Upsert, SourceEncode::Protobuf)
534 }
535 (format, encode) => {
536 bail!(
537 "Unsupported combination of format {:?} and encode {:?}",
538 format,
539 encode
540 );
541 }
542 };
543 Ok(SourceStruct::new(format, encode))
544}
545
546pub type BoxSourceMessageStream =
548 BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
549pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
551pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
554pub type BoxSourceChunkWithStateStream =
556 BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
557
558pub type BoxStreamingFileSourceChunkStream =
560 BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
561
562pub trait SourceChunkStream:
564 Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
565{
566}
567impl<T> SourceChunkStream for T where
568 T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
569{
570}
571
572pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
573
574#[async_trait]
578pub trait SplitReader: Sized + Send {
579 type Properties;
580 type Split: SplitMetaData;
581
582 async fn new(
583 properties: Self::Properties,
584 state: Vec<Self::Split>,
585 parser_config: ParserConfig,
586 source_ctx: SourceContextRef,
587 columns: Option<Vec<Column>>,
588 ) -> crate::error::ConnectorResult<Self>;
589
590 fn into_stream(self) -> BoxSourceChunkStream;
591
592 fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
593 HashMap::new()
594 }
595
596 async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
597 Err(anyhow!("seek_to_latest is not supported for this connector").into())
598 }
599}
600
601#[derive(Debug, Clone)]
606pub enum BackfillInfo {
607 HasDataToBackfill {
608 latest_offset: String,
616 },
617 NoDataToBackfill,
622}
623
624for_all_sources!(impl_connector_properties);
625
626impl Default for ConnectorProperties {
627 fn default() -> Self {
628 ConnectorProperties::Test(Box::default())
629 }
630}
631
632impl ConnectorProperties {
633 pub fn extract(
640 with_properties: WithOptionsSecResolved,
641 deny_unknown_fields: bool,
642 ) -> Result<Self> {
643 let (options, secret_refs) = with_properties.into_parts();
644 let mut options_with_secret =
645 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
646 let connector = options_with_secret
647 .remove(UPSTREAM_SOURCE_KEY)
648 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
649 .to_lowercase();
650 match_source_name_str!(
651 connector.as_str(),
652 PropType,
653 PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
654 .map(ConnectorProperties::from),
655 |other| bail!("connector '{}' is not supported", other)
656 )
657 }
658
659 pub fn enforce_secret_source(
660 with_properties: &impl WithPropertiesExt,
661 ) -> crate::error::ConnectorResult<()> {
662 let connector = with_properties
663 .get_connector()
664 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
665 .to_lowercase();
666 let key_iter = with_properties.key_iter();
667 match_source_name_str!(
668 connector.as_str(),
669 PropType,
670 PropType::enforce_secret(key_iter),
671 |other| bail!("connector '{}' is not supported", other)
672 )
673 }
674
675 pub fn enable_drop_split(&self) -> bool {
676 matches!(
678 self,
679 ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
680 )
681 }
682
683 pub fn enable_adaptive_splits(&self) -> bool {
685 matches!(self, ConnectorProperties::Nats(_))
686 }
687
688 pub fn init_from_pb_source(&mut self, source: &PbSource) {
690 dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
691 }
692
693 pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
695 dispatch_source_prop!(self, |prop| prop
696 .init_from_pb_cdc_table_desc(cdc_table_desc))
697 }
698
699 pub fn support_multiple_splits(&self) -> bool {
700 matches!(self, ConnectorProperties::Kafka(_))
701 || matches!(self, ConnectorProperties::OpendalS3(_))
702 || matches!(self, ConnectorProperties::Gcs(_))
703 || matches!(self, ConnectorProperties::Azblob(_))
704 }
705
706 pub async fn create_split_enumerator(
707 self,
708 context: crate::source::base::SourceEnumeratorContextRef,
709 ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
710 let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
711 <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
712 ));
713 Ok(enumerator)
714 }
715
716 pub async fn create_split_reader(
717 self,
718 splits: Vec<SplitImpl>,
719 parser_config: ParserConfig,
720 source_ctx: SourceContextRef,
721 columns: Option<Vec<Column>>,
722 mut opt: crate::source::CreateSplitReaderOpt,
723 ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
724 opt.support_multiple_splits = self.support_multiple_splits();
725 tracing::debug!(
726 ?splits,
727 support_multiple_splits = opt.support_multiple_splits,
728 "spawning connector split reader",
729 );
730
731 dispatch_source_prop!(self, |prop| create_split_readers(
732 *prop,
733 splits,
734 parser_config,
735 source_ctx,
736 columns,
737 opt
738 )
739 .await)
740 }
741}
742
743for_all_sources!(impl_split);
744for_all_connections!(impl_connection);
745
746impl From<&SplitImpl> for ConnectorSplit {
747 fn from(split: &SplitImpl) -> Self {
748 dispatch_split_impl!(split, |inner| {
749 ConnectorSplit {
750 split_type: String::from(PropType::SOURCE_NAME),
751 encoded_split: inner.encode_to_bytes().to_vec(),
752 }
753 })
754 }
755}
756
757impl TryFrom<&ConnectorSplit> for SplitImpl {
758 type Error = crate::error::ConnectorError;
759
760 fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
761 let split_type = split.split_type.to_lowercase();
762 match_source_name_str!(
763 split_type.as_str(),
764 PropType,
765 {
766 <PropType as SourceProperties>::Split::restore_from_bytes(
767 split.encoded_split.as_ref(),
768 )
769 .map(Into::into)
770 },
771 |other| bail!("connector '{}' is not supported", other)
772 )
773 }
774}
775
776impl SplitImpl {
777 fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
778 let split_type = split_type.to_lowercase();
779 match_source_name_str!(
780 split_type.as_str(),
781 PropType,
782 <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
783 |other| bail!("connector '{}' is not supported", other)
784 )
785 }
786
787 pub fn is_cdc_split(&self) -> bool {
788 matches!(
789 self,
790 MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
791 )
792 }
793
794 pub fn get_cdc_split_offset(&self) -> String {
796 match self {
797 MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
798 PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
799 MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
800 CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
801 SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
802 _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
803 }
804 }
805
806 pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
807 match self {
808 SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
809 Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
810 }
811 _ => None,
812 }
813 }
814}
815
816impl SplitMetaData for SplitImpl {
817 fn id(&self) -> SplitId {
818 dispatch_split_impl!(self, |inner| inner.id())
819 }
820
821 fn encode_to_json(&self) -> JsonbVal {
822 use serde_json::json;
823 let inner = self.encode_to_json_inner().take();
824 json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
825 }
826
827 fn restore_from_json(value: JsonbVal) -> Result<Self> {
828 let mut value = value.take();
829 let json_obj = value.as_object_mut().unwrap();
830 let split_type = json_obj
831 .remove(SPLIT_TYPE_FIELD)
832 .unwrap()
833 .as_str()
834 .unwrap()
835 .to_owned();
836 let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
837 Self::restore_from_json_inner(&split_type, inner_value.into())
838 }
839
840 fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
841 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
842 }
843}
844
845impl SplitImpl {
846 pub fn get_type(&self) -> String {
847 dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
848 }
849
850 pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
851 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
852 Ok(())
853 }
854
855 pub fn encode_to_json_inner(&self) -> JsonbVal {
856 dispatch_split_impl!(self, |inner| inner.encode_to_json())
857 }
858}
859
860use risingwave_common::types::DataType;
861
862#[derive(Clone, Debug)]
863pub struct Column {
864 pub name: String,
865 pub data_type: DataType,
866 pub is_visible: bool,
868}
869
870pub type SplitId = Arc<str>;
872
873#[derive(Debug, Clone)]
876pub struct SourceMessage {
877 pub key: Option<Vec<u8>>,
878 pub payload: Option<Vec<u8>>,
879 pub offset: String, pub split_id: SplitId,
881 pub meta: SourceMeta,
882}
883
884impl SourceMessage {
885 pub fn dummy() -> Self {
887 Self {
888 key: None,
889 payload: None,
890 offset: "".to_owned(),
891 split_id: "".into(),
892 meta: SourceMeta::Empty,
893 }
894 }
895
896 pub fn is_cdc_heartbeat(&self) -> bool {
898 self.key.is_none() && self.payload.is_none()
899 }
900}
901
902#[derive(Debug, Clone)]
903pub enum SourceMeta {
904 Kafka(KafkaMeta),
905 Kinesis(KinesisMeta),
906 Pulsar(PulsarMeta),
907 Nexmark(NexmarkMeta),
908 GooglePubsub(GooglePubsubMeta),
909 Datagen(DatagenMeta),
910 DebeziumCdc(DebeziumCdcMeta),
911 Nats(NatsMeta),
912 Empty,
914}
915
916impl PartialEq for SourceMessage {
918 fn eq(&self, other: &Self) -> bool {
919 self.offset == other.offset
920 && self.split_id == other.split_id
921 && self.payload == other.payload
922 }
923}
924impl Eq for SourceMessage {}
925
926pub trait SplitMetaData: Sized {
928 fn id(&self) -> SplitId;
929 fn encode_to_bytes(&self) -> Bytes {
930 self.encode_to_json()
931 .as_scalar_ref()
932 .value_serialize()
933 .into()
934 }
935 fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
936 Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
937 }
938
939 fn encode_to_json(&self) -> JsonbVal;
941 fn restore_from_json(value: JsonbVal) -> Result<Self>;
942 fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
943}
944
945pub type ConnectorState = Option<Vec<SplitImpl>>;
950
951#[cfg(test)]
952mod tests {
953 use maplit::*;
954 use nexmark::event::EventType;
955
956 use super::*;
957 use crate::source::cdc::{DebeziumCdcSplit, Mysql};
958 use crate::source::kafka::KafkaSplit;
959
960 #[test]
961 fn test_split_impl_get_fn() -> Result<()> {
962 let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
963 let split_impl = SplitImpl::Kafka(split.clone());
964 let get_value = split_impl.into_kafka().unwrap();
965 println!("{:?}", get_value);
966 assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
967 assert_eq!(split.encode_to_json(), get_value.encode_to_json());
968
969 Ok(())
970 }
971
972 #[test]
973 fn test_cdc_split_state() -> Result<()> {
974 let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
975 let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
976 let split_impl = SplitImpl::MysqlCdc(split);
977 let encoded_split = split_impl.encode_to_bytes();
978 let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
979 assert_eq!(
980 split_impl.encode_to_bytes(),
981 restored_split_impl.encode_to_bytes()
982 );
983 assert_eq!(
984 split_impl.encode_to_json(),
985 restored_split_impl.encode_to_json()
986 );
987
988 let encoded_split = split_impl.encode_to_json();
989 let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
990 assert_eq!(
991 split_impl.encode_to_bytes(),
992 restored_split_impl.encode_to_bytes()
993 );
994 assert_eq!(
995 split_impl.encode_to_json(),
996 restored_split_impl.encode_to_json()
997 );
998 Ok(())
999 }
1000
1001 #[test]
1002 fn test_extract_nexmark_config() {
1003 let props = convert_args!(btreemap!(
1004 "connector" => "nexmark",
1005 "nexmark.table.type" => "Person",
1006 "nexmark.split.num" => "1",
1007 ));
1008
1009 let props =
1010 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1011 .unwrap();
1012
1013 if let ConnectorProperties::Nexmark(props) = props {
1014 assert_eq!(props.table_type, Some(EventType::Person));
1015 assert_eq!(props.split_num, 1);
1016 } else {
1017 panic!("extract nexmark config failed");
1018 }
1019 }
1020
1021 #[test]
1022 fn test_extract_kafka_config() {
1023 let props = convert_args!(btreemap!(
1024 "connector" => "kafka",
1025 "properties.bootstrap.server" => "b1,b2",
1026 "topic" => "test",
1027 "scan.startup.mode" => "earliest",
1028 "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
1029 ));
1030
1031 let props =
1032 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1033 .unwrap();
1034 if let ConnectorProperties::Kafka(k) = props {
1035 let btreemap = btreemap! {
1036 "b-1:9092".to_owned() => "dns-1".to_owned(),
1037 "b-2:9092".to_owned() => "dns-2".to_owned(),
1038 };
1039 assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
1040 } else {
1041 panic!("extract kafka config failed");
1042 }
1043 }
1044
1045 #[test]
1046 fn test_extract_cdc_properties() {
1047 let user_props_mysql = convert_args!(btreemap!(
1048 "connector" => "mysql-cdc",
1049 "database.hostname" => "127.0.0.1",
1050 "database.port" => "3306",
1051 "database.user" => "root",
1052 "database.password" => "123456",
1053 "database.name" => "mydb",
1054 "table.name" => "products",
1055 ));
1056
1057 let user_props_postgres = convert_args!(btreemap!(
1058 "connector" => "postgres-cdc",
1059 "database.hostname" => "127.0.0.1",
1060 "database.port" => "5432",
1061 "database.user" => "root",
1062 "database.password" => "654321",
1063 "schema.name" => "public",
1064 "database.name" => "mypgdb",
1065 "table.name" => "orders",
1066 ));
1067
1068 let conn_props = ConnectorProperties::extract(
1069 WithOptionsSecResolved::without_secrets(user_props_mysql),
1070 true,
1071 )
1072 .unwrap();
1073 if let ConnectorProperties::MysqlCdc(c) = conn_props {
1074 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1075 assert_eq!(c.properties.get("database.port").unwrap(), "3306");
1076 assert_eq!(c.properties.get("database.user").unwrap(), "root");
1077 assert_eq!(c.properties.get("database.password").unwrap(), "123456");
1078 assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
1079 assert_eq!(c.properties.get("table.name").unwrap(), "products");
1080 } else {
1081 panic!("extract cdc config failed");
1082 }
1083
1084 let conn_props = ConnectorProperties::extract(
1085 WithOptionsSecResolved::without_secrets(user_props_postgres),
1086 true,
1087 )
1088 .unwrap();
1089 if let ConnectorProperties::PostgresCdc(c) = conn_props {
1090 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1091 assert_eq!(c.properties.get("database.port").unwrap(), "5432");
1092 assert_eq!(c.properties.get("database.user").unwrap(), "root");
1093 assert_eq!(c.properties.get("database.password").unwrap(), "654321");
1094 assert_eq!(c.properties.get("schema.name").unwrap(), "public");
1095 assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1096 assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1097 } else {
1098 panic!("extract cdc config failed");
1099 }
1100 }
1101}