1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt, TryStreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::id::{ActorId, FragmentId, SourceId};
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use crate::enforce_secret::EnforceSecret;
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::batch::BatchSourceSplitImpl;
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57 WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58 for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59 match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68#[derive(Clone)]
71pub struct CdcAutoSchemaChangeFailCallback(
72 Arc<dyn Fn(SourceId, String, String, String, String) + Send + Sync>,
73);
74
75impl CdcAutoSchemaChangeFailCallback {
76 pub fn new<F>(f: F) -> Self
77 where
78 F: Fn(SourceId, String, String, String, String) + Send + Sync + 'static,
79 {
80 Self(Arc::new(f))
81 }
82
83 pub fn call(
84 &self,
85 source_id: SourceId,
86 table_name: String,
87 cdc_table_id: String,
88 upstream_ddl: String,
89 fail_info: String,
90 ) {
91 self.0(source_id, table_name, cdc_table_id, upstream_ddl, fail_info);
92 }
93}
94
95impl std::fmt::Debug for CdcAutoSchemaChangeFailCallback {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.write_str("CdcAutoSchemaChangeFailCallback")
98 }
99}
100pub trait TryFromBTreeMap: Sized + UnknownFields {
101 fn try_from_btreemap(
103 props: BTreeMap<String, String>,
104 deny_unknown_fields: bool,
105 ) -> Result<Self>;
106}
107
108pub trait SourceProperties:
112 TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
113{
114 const SOURCE_NAME: &'static str;
115 type Split: SplitMetaData
116 + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
117 + Into<SplitImpl>;
118 type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
119 type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
120
121 fn init_from_pb_source(&mut self, _source: &PbSource) {}
123
124 fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
126}
127
128pub trait UnknownFields {
129 fn unknown_fields(&self) -> HashMap<String, String>;
131}
132
133impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
134 fn try_from_btreemap(
135 props: BTreeMap<String, String>,
136 deny_unknown_fields: bool,
137 ) -> Result<Self> {
138 let json_value = serde_json::to_value(props)?;
139 let res = serde_json::from_value::<P>(json_value)?;
140
141 if !deny_unknown_fields || res.unknown_fields().is_empty() {
142 Ok(res)
143 } else {
144 bail!(
145 "Unknown fields in the WITH clause: {:?}",
146 res.unknown_fields()
147 )
148 }
149 }
150}
151
152#[derive(Default)]
153pub struct CreateSplitReaderOpt {
154 pub support_multiple_splits: bool,
155 pub seek_to_latest: bool,
156}
157
158#[derive(Default)]
159pub struct CreateSplitReaderResult {
160 pub latest_splits: Option<Vec<SplitImpl>>,
161 pub backfill_info: HashMap<SplitId, BackfillInfo>,
162}
163
164pub async fn create_split_readers<P: SourceProperties>(
165 prop: P,
166 splits: Vec<SplitImpl>,
167 parser_config: ParserConfig,
168 source_ctx: SourceContextRef,
169 columns: Option<Vec<Column>>,
170 opt: CreateSplitReaderOpt,
171) -> Result<(BoxSourceReaderEventStream, CreateSplitReaderResult)> {
172 let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
173 let mut res = CreateSplitReaderResult {
174 backfill_info: HashMap::new(),
175 latest_splits: None,
176 };
177 if opt.support_multiple_splits {
178 let mut reader = P::SplitReader::new(
179 prop.clone(),
180 splits,
181 parser_config.clone(),
182 source_ctx.clone(),
183 columns.clone(),
184 )
185 .await?;
186 if opt.seek_to_latest {
187 res.latest_splits = Some(reader.seek_to_latest().await?);
188 }
189 res.backfill_info = reader.backfill_info();
190 Ok((reader.into_event_stream().boxed(), res))
191 } else {
192 let mut readers = try_join_all(splits.into_iter().map(|split| {
193 P::SplitReader::new(
196 prop.clone(),
197 vec![split],
198 parser_config.clone(),
199 source_ctx.clone(),
200 columns.clone(),
201 )
202 }))
203 .await?;
204 if opt.seek_to_latest {
205 let mut latest_splits = vec![];
206 for reader in &mut readers {
207 latest_splits.extend(reader.seek_to_latest().await?);
208 }
209 res.latest_splits = Some(latest_splits);
210 }
211 res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
212 Ok((
213 select_all(readers.into_iter().map(|r| r.into_event_stream())).boxed(),
214 res,
215 ))
216 }
217}
218
219#[async_trait]
222pub trait SplitEnumerator: Sized + Send {
223 type Split: SplitMetaData + Send;
224 type Properties;
225
226 async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
227 -> Result<Self>;
228 async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
229 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<FragmentId>) -> Result<()> {
231 Ok(())
232 }
233 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<FragmentId>) -> Result<()> {
235 Ok(())
236 }
237 async fn on_tick(&mut self) -> Result<()> {
241 Ok(())
242 }
243}
244
245pub type SourceContextRef = Arc<SourceContext>;
246pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
247
248#[async_trait]
250pub trait AnySplitEnumerator: Send {
251 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
252 async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()>;
253 async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()>;
254 async fn on_tick(&mut self) -> Result<()>;
255}
256
257#[async_trait]
258impl<T: SplitEnumerator<Split: Into<SplitImpl>> + 'static> AnySplitEnumerator for T {
259 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
260 SplitEnumerator::list_splits(self)
261 .await
262 .map(|s| s.into_iter().map(|s| s.into()).collect())
263 }
264
265 async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()> {
266 SplitEnumerator::on_drop_fragments(self, fragment_ids).await
267 }
268
269 async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()> {
270 SplitEnumerator::on_finish_backfill(self, fragment_ids).await
271 }
272
273 async fn on_tick(&mut self) -> Result<()> {
274 SplitEnumerator::on_tick(self).await
275 }
276}
277
278pub const MAX_CHUNK_SIZE: usize = 1024;
280
281#[derive(Debug, Clone, Copy)]
282pub struct SourceCtrlOpts {
283 pub chunk_size: usize,
285 pub split_txn: bool,
287}
288
289impl !Default for SourceCtrlOpts {}
292
293impl SourceCtrlOpts {
294 #[cfg(test)]
295 pub fn for_test() -> Self {
296 SourceCtrlOpts {
297 chunk_size: 256,
298 split_txn: false,
299 }
300 }
301}
302
303#[derive(Debug)]
304pub struct SourceEnumeratorContext {
305 pub info: SourceEnumeratorInfo,
306 pub metrics: Arc<EnumeratorMetrics>,
307}
308
309impl SourceEnumeratorContext {
310 pub fn dummy() -> SourceEnumeratorContext {
313 SourceEnumeratorContext {
314 info: SourceEnumeratorInfo {
315 source_id: 0.into(),
316 },
317 metrics: Arc::new(EnumeratorMetrics::default()),
318 }
319 }
320}
321
322#[derive(Clone, Debug)]
323pub struct SourceEnumeratorInfo {
324 pub source_id: SourceId,
325}
326
327#[derive(Clone, Debug)]
328pub struct SourceContext {
329 pub actor_id: ActorId,
330 pub source_id: SourceId,
331 pub fragment_id: FragmentId,
332 pub source_name: String,
333 pub metrics: Arc<SourceMetrics>,
334 pub source_ctrl_opts: SourceCtrlOpts,
335 pub connector_props: ConnectorProperties,
336 pub schema_change_tx:
338 Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
339 pub on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
341}
342
343impl SourceContext {
344 pub fn new(
345 actor_id: ActorId,
346 source_id: SourceId,
347 fragment_id: FragmentId,
348 source_name: String,
349 metrics: Arc<SourceMetrics>,
350 source_ctrl_opts: SourceCtrlOpts,
351 connector_props: ConnectorProperties,
352 schema_change_channel: Option<
353 mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
354 >,
355 ) -> Self {
356 Self::new_with_auto_schema_change_callback(
357 actor_id,
358 source_id,
359 fragment_id,
360 source_name,
361 metrics,
362 source_ctrl_opts,
363 connector_props,
364 schema_change_channel,
365 None,
366 )
367 }
368
369 pub fn new_with_auto_schema_change_callback(
370 actor_id: ActorId,
371 source_id: SourceId,
372 fragment_id: FragmentId,
373 source_name: String,
374 metrics: Arc<SourceMetrics>,
375 source_ctrl_opts: SourceCtrlOpts,
376 connector_props: ConnectorProperties,
377 schema_change_channel: Option<
378 mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
379 >,
380 on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
381 ) -> Self {
382 Self {
383 actor_id,
384 source_id,
385 fragment_id,
386 source_name,
387 metrics,
388 source_ctrl_opts,
389 connector_props,
390 schema_change_tx: schema_change_channel,
391 on_cdc_auto_schema_change_failure,
392 }
393 }
394
395 pub fn dummy() -> Self {
398 Self::new(
399 0.into(),
400 SourceId::new(0),
401 0.into(),
402 "dummy".to_owned(),
403 Arc::new(SourceMetrics::default()),
404 SourceCtrlOpts {
405 chunk_size: MAX_CHUNK_SIZE,
406 split_txn: false,
407 },
408 ConnectorProperties::default(),
409 None,
410 )
411 }
412
413 pub fn on_cdc_auto_schema_change_failure(
416 &self,
417 source_id: SourceId,
418 table_name: String,
419 cdc_table_id: String,
420 upstream_ddl: String,
421 fail_info: String,
422 ) {
423 if let Some(ref cdc_auto_schema_change_fail_callback) =
424 self.on_cdc_auto_schema_change_failure
425 {
426 cdc_auto_schema_change_fail_callback.call(
427 source_id,
428 table_name,
429 cdc_table_id,
430 upstream_ddl,
431 fail_info,
432 );
433 }
434 }
435}
436
437#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
438pub enum SourceFormat {
439 #[default]
440 Invalid,
441 Native,
442 None,
443 Debezium,
444 DebeziumMongo,
445 Maxwell,
446 Canal,
447 Upsert,
448 Plain,
449}
450
451#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
453pub enum SourceEncode {
454 #[default]
455 Invalid,
456 Native,
457 None,
458 Avro,
459 Csv,
460 Protobuf,
461 Json,
462 Bytes,
463 Parquet,
464}
465
466#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
467pub struct SourceStruct {
468 pub format: SourceFormat,
469 pub encode: SourceEncode,
470}
471
472impl SourceStruct {
473 pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
474 Self { format, encode }
475 }
476}
477
478pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
480 use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
481
482 if let Ok(format) = info.get_row_format() {
484 let (format, encode) = match format {
485 RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
486 RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
487 RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
488 RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
489 RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
490 RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
491 RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
492 RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
493 RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
494 RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
495 RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
496 RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
497 RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
498 RowFormatType::RowUnspecified => unreachable!(),
499 };
500 return Ok(SourceStruct::new(format, encode));
501 }
502 let source_format = info.get_format()?;
503 let source_encode = info.get_row_encode()?;
504 let (format, encode) = match (source_format, source_encode) {
505 (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
506 (PbFormatType::Plain, PbEncodeType::Protobuf) => {
507 (SourceFormat::Plain, SourceEncode::Protobuf)
508 }
509 (PbFormatType::Debezium, PbEncodeType::Json) => {
510 (SourceFormat::Debezium, SourceEncode::Json)
511 }
512 (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
513 (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
514 (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
515 (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
516 (PbFormatType::Plain, PbEncodeType::Parquet) => {
517 (SourceFormat::Plain, SourceEncode::Parquet)
518 }
519 (PbFormatType::Native, PbEncodeType::Native) => {
520 (SourceFormat::Native, SourceEncode::Native)
521 }
522 (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
523 (PbFormatType::Debezium, PbEncodeType::Avro) => {
524 (SourceFormat::Debezium, SourceEncode::Avro)
525 }
526 (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
527 (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
528 (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
529 (SourceFormat::DebeziumMongo, SourceEncode::Json)
530 }
531 (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
532 (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
533 (SourceFormat::Upsert, SourceEncode::Protobuf)
534 }
535 (format, encode) => {
536 bail!(
537 "Unsupported combination of format {:?} and encode {:?}",
538 format,
539 encode
540 );
541 }
542 };
543 Ok(SourceStruct::new(format, encode))
544}
545
546pub type BoxSourceMessageStream =
548 BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
549pub type BoxSourceMessageEventStream =
551 BoxStream<'static, crate::error::ConnectorResult<SourceMessageEvent>>;
552pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
554pub type BoxSourceReaderEventStream =
556 BoxStream<'static, crate::error::ConnectorResult<SourceReaderEvent>>;
557pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
560pub type BoxSourceChunkWithStateStream =
562 BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
563
564#[derive(Debug)]
565pub enum SourceMessageEvent {
566 Data(Vec<SourceMessage>),
567 SplitProgress(HashMap<SplitId, String>),
568}
569
570#[derive(Debug)]
571pub enum SourceReaderEvent {
572 DataChunk(StreamChunk),
573 SplitProgress(HashMap<SplitId, String>),
574}
575
576pub type BoxStreamingFileSourceChunkStream =
578 BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
579
580pub trait SourceChunkStream:
582 Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
583{
584}
585impl<T> SourceChunkStream for T where
586 T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
587{
588}
589
590pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
591
592#[async_trait]
596pub trait SplitReader: Sized + Send {
597 type Properties;
598 type Split: SplitMetaData;
599
600 async fn new(
601 properties: Self::Properties,
602 state: Vec<Self::Split>,
603 parser_config: ParserConfig,
604 source_ctx: SourceContextRef,
605 columns: Option<Vec<Column>>,
606 ) -> crate::error::ConnectorResult<Self>;
607
608 fn into_stream(self) -> BoxSourceChunkStream;
609
610 fn into_event_stream(self) -> BoxSourceReaderEventStream {
611 self.into_stream()
612 .map_ok(SourceReaderEvent::DataChunk)
613 .boxed()
614 }
615
616 fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
617 HashMap::new()
618 }
619
620 async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
621 Err(anyhow!("seek_to_latest is not supported for this connector").into())
622 }
623}
624
625#[derive(Debug, Clone)]
630pub enum BackfillInfo {
631 HasDataToBackfill {
632 latest_offset: String,
640 },
641 NoDataToBackfill,
646}
647
648for_all_sources!(impl_connector_properties);
649
650impl Default for ConnectorProperties {
651 fn default() -> Self {
652 ConnectorProperties::Test(Box::default())
653 }
654}
655
656impl ConnectorProperties {
657 pub fn extract(
664 with_properties: WithOptionsSecResolved,
665 deny_unknown_fields: bool,
666 ) -> Result<Self> {
667 let (options, secret_refs) = with_properties.into_parts();
668 let mut options_with_secret =
669 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
670 let connector = options_with_secret
671 .remove(UPSTREAM_SOURCE_KEY)
672 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
673 .to_lowercase();
674 match_source_name_str!(
675 connector.as_str(),
676 PropType,
677 PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
678 .map(ConnectorProperties::from),
679 |other| bail!("connector '{}' is not supported", other)
680 )
681 }
682
683 pub fn enforce_secret_source(
684 with_properties: &impl WithPropertiesExt,
685 ) -> crate::error::ConnectorResult<()> {
686 let connector = with_properties
687 .get_connector()
688 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
689 .to_lowercase();
690 let key_iter = with_properties.key_iter();
691 match_source_name_str!(
692 connector.as_str(),
693 PropType,
694 PropType::enforce_secret(key_iter),
695 |other| bail!("connector '{}' is not supported", other)
696 )
697 }
698
699 pub fn enable_drop_split(&self) -> bool {
700 matches!(
702 self,
703 ConnectorProperties::Kinesis(_)
704 | ConnectorProperties::Nats(_)
705 | ConnectorProperties::GooglePubsub(_)
706 )
707 }
708
709 pub fn enable_adaptive_splits(&self) -> bool {
711 matches!(
712 self,
713 ConnectorProperties::Nats(_) | ConnectorProperties::GooglePubsub(_)
714 )
715 }
716
717 pub fn init_from_pb_source(&mut self, source: &PbSource) {
719 dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
720 }
721
722 pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
724 dispatch_source_prop!(self, |prop| prop
725 .init_from_pb_cdc_table_desc(cdc_table_desc))
726 }
727
728 pub fn support_multiple_splits(&self) -> bool {
729 matches!(self, ConnectorProperties::Kafka(_))
730 || matches!(self, ConnectorProperties::OpendalS3(_))
731 || matches!(self, ConnectorProperties::Gcs(_))
732 || matches!(self, ConnectorProperties::Azblob(_))
733 }
734
735 pub async fn create_split_enumerator(
736 self,
737 context: crate::source::base::SourceEnumeratorContextRef,
738 ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
739 let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
740 <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
741 ));
742 Ok(enumerator)
743 }
744
745 pub async fn create_split_reader(
746 self,
747 splits: Vec<SplitImpl>,
748 parser_config: ParserConfig,
749 source_ctx: SourceContextRef,
750 columns: Option<Vec<Column>>,
751 mut opt: crate::source::CreateSplitReaderOpt,
752 ) -> Result<(
753 BoxSourceReaderEventStream,
754 crate::source::CreateSplitReaderResult,
755 )> {
756 opt.support_multiple_splits = self.support_multiple_splits();
757 tracing::debug!(
758 ?splits,
759 support_multiple_splits = opt.support_multiple_splits,
760 "spawning connector split reader",
761 );
762
763 dispatch_source_prop!(self, |prop| create_split_readers(
764 *prop,
765 splits,
766 parser_config,
767 source_ctx,
768 columns,
769 opt
770 )
771 .await)
772 }
773}
774
775for_all_sources!(impl_split);
776for_all_connections!(impl_connection);
777
778impl From<&SplitImpl> for ConnectorSplit {
779 fn from(split: &SplitImpl) -> Self {
780 dispatch_split_impl!(split, |inner| {
781 ConnectorSplit {
782 split_type: String::from(PropType::SOURCE_NAME),
783 encoded_split: inner.encode_to_bytes().to_vec(),
784 }
785 })
786 }
787}
788
789impl TryFrom<&ConnectorSplit> for SplitImpl {
790 type Error = crate::error::ConnectorError;
791
792 fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
793 let split_type = split.split_type.to_lowercase();
794 match_source_name_str!(
795 split_type.as_str(),
796 PropType,
797 {
798 <PropType as SourceProperties>::Split::restore_from_bytes(
799 split.encoded_split.as_ref(),
800 )
801 .map(Into::into)
802 },
803 |other| bail!("connector '{}' is not supported", other)
804 )
805 }
806}
807
808impl SplitImpl {
809 fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
810 let split_type = split_type.to_lowercase();
811 match_source_name_str!(
812 split_type.as_str(),
813 PropType,
814 <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
815 |other| bail!("connector '{}' is not supported", other)
816 )
817 }
818
819 pub fn is_cdc_split(&self) -> bool {
820 matches!(
821 self,
822 MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
823 )
824 }
825
826 pub fn get_cdc_split_offset(&self) -> String {
828 match self {
829 MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
830 PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
831 MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
832 CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
833 SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
834 _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
835 }
836 }
837
838 pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
839 match self {
840 SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
841 Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
842 }
843 _ => None,
844 }
845 }
846}
847
848impl SplitMetaData for SplitImpl {
849 fn id(&self) -> SplitId {
850 dispatch_split_impl!(self, |inner| inner.id())
851 }
852
853 fn encode_to_json(&self) -> JsonbVal {
854 use serde_json::json;
855 let inner = self.encode_to_json_inner().take();
856 json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
857 }
858
859 fn restore_from_json(value: JsonbVal) -> Result<Self> {
860 let mut value = value.take();
861 let json_obj = value.as_object_mut().unwrap();
862 let split_type = json_obj
863 .remove(SPLIT_TYPE_FIELD)
864 .unwrap()
865 .as_str()
866 .unwrap()
867 .to_owned();
868 let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
869 Self::restore_from_json_inner(&split_type, inner_value.into())
870 }
871
872 fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
873 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
874 }
875}
876
877impl SplitImpl {
878 pub fn get_type(&self) -> String {
879 dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
880 }
881
882 pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
883 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
884 Ok(())
885 }
886
887 pub fn encode_to_json_inner(&self) -> JsonbVal {
888 dispatch_split_impl!(self, |inner| inner.encode_to_json())
889 }
890}
891
892use risingwave_common::types::DataType;
893
894#[derive(Clone, Debug)]
895pub struct Column {
896 pub name: String,
897 pub data_type: DataType,
898 pub is_visible: bool,
900}
901
902pub type SplitId = Arc<str>;
904
905#[derive(Debug, Clone)]
908pub struct SourceMessage {
909 pub key: Option<Vec<u8>>,
910 pub payload: Option<Vec<u8>>,
911 pub offset: String, pub split_id: SplitId,
913 pub meta: SourceMeta,
914}
915
916impl SourceMessage {
917 pub fn dummy() -> Self {
919 Self {
920 key: None,
921 payload: None,
922 offset: "".to_owned(),
923 split_id: "".into(),
924 meta: SourceMeta::Empty,
925 }
926 }
927
928 pub fn is_cdc_heartbeat(&self) -> bool {
930 self.key.is_none() && self.payload.is_none()
931 }
932}
933
934#[derive(Debug, Clone)]
935pub enum SourceMeta {
936 Kafka(KafkaMeta),
937 Kinesis(KinesisMeta),
938 Pulsar(PulsarMeta),
939 Nexmark(NexmarkMeta),
940 GooglePubsub(GooglePubsubMeta),
941 Datagen(DatagenMeta),
942 DebeziumCdc(DebeziumCdcMeta),
943 Nats(NatsMeta),
944 Empty,
946}
947
948impl PartialEq for SourceMessage {
950 fn eq(&self, other: &Self) -> bool {
951 self.offset == other.offset
952 && self.split_id == other.split_id
953 && self.payload == other.payload
954 }
955}
956impl Eq for SourceMessage {}
957
958pub trait SplitMetaData: Sized {
960 fn id(&self) -> SplitId;
961 fn encode_to_bytes(&self) -> Bytes {
962 self.encode_to_json()
963 .as_scalar_ref()
964 .value_serialize()
965 .into()
966 }
967 fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
968 Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
969 }
970
971 fn encode_to_json(&self) -> JsonbVal;
973 fn restore_from_json(value: JsonbVal) -> Result<Self>;
974 fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
975}
976
977pub type ConnectorState = Option<Vec<SplitImpl>>;
982
983#[cfg(test)]
984mod tests {
985 use maplit::*;
986 use nexmark::event::EventType;
987
988 use super::*;
989 use crate::source::cdc::{DebeziumCdcSplit, Mysql};
990 use crate::source::kafka::KafkaSplit;
991
992 #[test]
993 fn test_split_impl_get_fn() -> Result<()> {
994 let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
995 let split_impl = SplitImpl::Kafka(split.clone());
996 let get_value = split_impl.into_kafka().unwrap();
997 println!("{:?}", get_value);
998 assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
999 assert_eq!(split.encode_to_json(), get_value.encode_to_json());
1000
1001 Ok(())
1002 }
1003
1004 #[test]
1005 fn test_cdc_split_state() -> Result<()> {
1006 let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
1007 let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
1008 let split_impl = SplitImpl::MysqlCdc(split);
1009 let encoded_split = split_impl.encode_to_bytes();
1010 let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
1011 assert_eq!(
1012 split_impl.encode_to_bytes(),
1013 restored_split_impl.encode_to_bytes()
1014 );
1015 assert_eq!(
1016 split_impl.encode_to_json(),
1017 restored_split_impl.encode_to_json()
1018 );
1019
1020 let encoded_split = split_impl.encode_to_json();
1021 let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
1022 assert_eq!(
1023 split_impl.encode_to_bytes(),
1024 restored_split_impl.encode_to_bytes()
1025 );
1026 assert_eq!(
1027 split_impl.encode_to_json(),
1028 restored_split_impl.encode_to_json()
1029 );
1030 Ok(())
1031 }
1032
1033 #[test]
1034 fn test_extract_nexmark_config() {
1035 let props = convert_args!(btreemap!(
1036 "connector" => "nexmark",
1037 "nexmark.table.type" => "Person",
1038 "nexmark.split.num" => "1",
1039 ));
1040
1041 let props =
1042 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1043 .unwrap();
1044
1045 if let ConnectorProperties::Nexmark(props) = props {
1046 assert_eq!(props.table_type, Some(EventType::Person));
1047 assert_eq!(props.split_num, 1);
1048 } else {
1049 panic!("extract nexmark config failed");
1050 }
1051 }
1052
1053 #[test]
1054 fn test_extract_kafka_config() {
1055 let props = convert_args!(btreemap!(
1056 "connector" => "kafka",
1057 "properties.bootstrap.server" => "b1,b2",
1058 "topic" => "test",
1059 "scan.startup.mode" => "earliest",
1060 "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
1061 ));
1062
1063 let props =
1064 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1065 .unwrap();
1066 if let ConnectorProperties::Kafka(k) = props {
1067 let btreemap = btreemap! {
1068 "b-1:9092".to_owned() => "dns-1".to_owned(),
1069 "b-2:9092".to_owned() => "dns-2".to_owned(),
1070 };
1071 assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
1072 } else {
1073 panic!("extract kafka config failed");
1074 }
1075 }
1076
1077 #[test]
1078 fn test_extract_cdc_properties() {
1079 let user_props_mysql = convert_args!(btreemap!(
1080 "connector" => "mysql-cdc",
1081 "database.hostname" => "127.0.0.1",
1082 "database.port" => "3306",
1083 "database.user" => "root",
1084 "database.password" => "123456",
1085 "database.name" => "mydb",
1086 "table.name" => "products",
1087 ));
1088
1089 let user_props_postgres = convert_args!(btreemap!(
1090 "connector" => "postgres-cdc",
1091 "database.hostname" => "127.0.0.1",
1092 "database.port" => "5432",
1093 "database.user" => "root",
1094 "database.password" => "654321",
1095 "schema.name" => "public",
1096 "database.name" => "mypgdb",
1097 "table.name" => "orders",
1098 ));
1099
1100 let conn_props = ConnectorProperties::extract(
1101 WithOptionsSecResolved::without_secrets(user_props_mysql),
1102 true,
1103 )
1104 .unwrap();
1105 if let ConnectorProperties::MysqlCdc(c) = conn_props {
1106 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1107 assert_eq!(c.properties.get("database.port").unwrap(), "3306");
1108 assert_eq!(c.properties.get("database.user").unwrap(), "root");
1109 assert_eq!(c.properties.get("database.password").unwrap(), "123456");
1110 assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
1111 assert_eq!(c.properties.get("table.name").unwrap(), "products");
1112 } else {
1113 panic!("extract cdc config failed");
1114 }
1115
1116 let conn_props = ConnectorProperties::extract(
1117 WithOptionsSecResolved::without_secrets(user_props_postgres),
1118 true,
1119 )
1120 .unwrap();
1121 if let ConnectorProperties::PostgresCdc(c) = conn_props {
1122 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1123 assert_eq!(c.properties.get("database.port").unwrap(), "5432");
1124 assert_eq!(c.properties.get("database.user").unwrap(), "root");
1125 assert_eq!(c.properties.get("database.password").unwrap(), "654321");
1126 assert_eq!(c.properties.get("schema.name").unwrap(), "public");
1127 assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1128 assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1129 } else {
1130 panic!("extract cdc config failed");
1131 }
1132 }
1133}