1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::catalog::TableId;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use crate::enforce_secret::EnforceSecret;
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::batch::BatchSourceSplitImpl;
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57 WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58 for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59 match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68#[derive(Clone)]
71pub struct CdcAutoSchemaChangeFailCallback(
72 Arc<dyn Fn(u32, String, String, String, String) + Send + Sync>,
73);
74
75impl CdcAutoSchemaChangeFailCallback {
76 pub fn new<F>(f: F) -> Self
77 where
78 F: Fn(u32, String, String, String, String) + Send + Sync + 'static,
79 {
80 Self(Arc::new(f))
81 }
82
83 pub fn call(
84 &self,
85 table_id: u32,
86 table_name: String,
87 cdc_table_id: String,
88 upstream_ddl: String,
89 fail_info: String,
90 ) {
91 self.0(table_id, table_name, cdc_table_id, upstream_ddl, fail_info);
92 }
93}
94
95impl std::fmt::Debug for CdcAutoSchemaChangeFailCallback {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.write_str("CdcAutoSchemaChangeFailCallback")
98 }
99}
100pub trait TryFromBTreeMap: Sized + UnknownFields {
101 fn try_from_btreemap(
103 props: BTreeMap<String, String>,
104 deny_unknown_fields: bool,
105 ) -> Result<Self>;
106}
107
108pub trait SourceProperties:
112 TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
113{
114 const SOURCE_NAME: &'static str;
115 type Split: SplitMetaData
116 + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
117 + Into<SplitImpl>;
118 type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
119 type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
120
121 fn init_from_pb_source(&mut self, _source: &PbSource) {}
123
124 fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
126}
127
128pub trait UnknownFields {
129 fn unknown_fields(&self) -> HashMap<String, String>;
131}
132
133impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
134 fn try_from_btreemap(
135 props: BTreeMap<String, String>,
136 deny_unknown_fields: bool,
137 ) -> Result<Self> {
138 let json_value = serde_json::to_value(props)?;
139 let res = serde_json::from_value::<P>(json_value)?;
140
141 if !deny_unknown_fields || res.unknown_fields().is_empty() {
142 Ok(res)
143 } else {
144 bail!(
145 "Unknown fields in the WITH clause: {:?}",
146 res.unknown_fields()
147 )
148 }
149 }
150}
151
152#[derive(Default)]
153pub struct CreateSplitReaderOpt {
154 pub support_multiple_splits: bool,
155 pub seek_to_latest: bool,
156}
157
158#[derive(Default)]
159pub struct CreateSplitReaderResult {
160 pub latest_splits: Option<Vec<SplitImpl>>,
161 pub backfill_info: HashMap<SplitId, BackfillInfo>,
162}
163
164pub async fn create_split_readers<P: SourceProperties>(
165 prop: P,
166 splits: Vec<SplitImpl>,
167 parser_config: ParserConfig,
168 source_ctx: SourceContextRef,
169 columns: Option<Vec<Column>>,
170 opt: CreateSplitReaderOpt,
171) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
172 let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
173 let mut res = CreateSplitReaderResult {
174 backfill_info: HashMap::new(),
175 latest_splits: None,
176 };
177 if opt.support_multiple_splits {
178 let mut reader = P::SplitReader::new(
179 prop.clone(),
180 splits,
181 parser_config.clone(),
182 source_ctx.clone(),
183 columns.clone(),
184 )
185 .await?;
186 if opt.seek_to_latest {
187 res.latest_splits = Some(reader.seek_to_latest().await?);
188 }
189 res.backfill_info = reader.backfill_info();
190 Ok((reader.into_stream().boxed(), res))
191 } else {
192 let mut readers = try_join_all(splits.into_iter().map(|split| {
193 P::SplitReader::new(
196 prop.clone(),
197 vec![split],
198 parser_config.clone(),
199 source_ctx.clone(),
200 columns.clone(),
201 )
202 }))
203 .await?;
204 if opt.seek_to_latest {
205 let mut latest_splits = vec![];
206 for reader in &mut readers {
207 latest_splits.extend(reader.seek_to_latest().await?);
208 }
209 res.latest_splits = Some(latest_splits);
210 }
211 res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
212 Ok((
213 select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
214 res,
215 ))
216 }
217}
218
219#[async_trait]
222pub trait SplitEnumerator: Sized + Send {
223 type Split: SplitMetaData + Send;
224 type Properties;
225
226 async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
227 -> Result<Self>;
228 async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
229 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
231 Ok(())
232 }
233 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
235 Ok(())
236 }
237 async fn on_tick(&mut self) -> Result<()> {
241 Ok(())
242 }
243}
244
245pub type SourceContextRef = Arc<SourceContext>;
246pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
247
248#[async_trait]
250pub trait AnySplitEnumerator: Send {
251 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
252 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
253 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
254 async fn on_tick(&mut self) -> Result<()>;
255}
256
257#[async_trait]
258impl<T: SplitEnumerator<Split: Into<SplitImpl>> + 'static> AnySplitEnumerator for T {
259 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
260 SplitEnumerator::list_splits(self)
261 .await
262 .map(|s| s.into_iter().map(|s| s.into()).collect())
263 }
264
265 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
266 SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
267 }
268
269 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
270 SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
271 }
272
273 async fn on_tick(&mut self) -> Result<()> {
274 SplitEnumerator::on_tick(self).await
275 }
276}
277
278pub const MAX_CHUNK_SIZE: usize = 1024;
280
281#[derive(Debug, Clone, Copy)]
282pub struct SourceCtrlOpts {
283 pub chunk_size: usize,
285 pub split_txn: bool,
287}
288
289impl !Default for SourceCtrlOpts {}
292
293impl SourceCtrlOpts {
294 #[cfg(test)]
295 pub fn for_test() -> Self {
296 SourceCtrlOpts {
297 chunk_size: 256,
298 split_txn: false,
299 }
300 }
301}
302
303#[derive(Debug)]
304pub struct SourceEnumeratorContext {
305 pub info: SourceEnumeratorInfo,
306 pub metrics: Arc<EnumeratorMetrics>,
307}
308
309impl SourceEnumeratorContext {
310 pub fn dummy() -> SourceEnumeratorContext {
313 SourceEnumeratorContext {
314 info: SourceEnumeratorInfo { source_id: 0 },
315 metrics: Arc::new(EnumeratorMetrics::default()),
316 }
317 }
318}
319
320#[derive(Clone, Debug)]
321pub struct SourceEnumeratorInfo {
322 pub source_id: u32,
323}
324
325#[derive(Clone, Debug)]
326pub struct SourceContext {
327 pub actor_id: u32,
328 pub source_id: TableId,
329 pub fragment_id: u32,
330 pub source_name: String,
331 pub metrics: Arc<SourceMetrics>,
332 pub source_ctrl_opts: SourceCtrlOpts,
333 pub connector_props: ConnectorProperties,
334 pub schema_change_tx:
336 Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
337 pub on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
339}
340
341impl SourceContext {
342 pub fn new(
343 actor_id: u32,
344 source_id: TableId,
345 fragment_id: u32,
346 source_name: String,
347 metrics: Arc<SourceMetrics>,
348 source_ctrl_opts: SourceCtrlOpts,
349 connector_props: ConnectorProperties,
350 schema_change_channel: Option<
351 mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
352 >,
353 ) -> Self {
354 Self::new_with_auto_schema_change_callback(
355 actor_id,
356 source_id,
357 fragment_id,
358 source_name,
359 metrics,
360 source_ctrl_opts,
361 connector_props,
362 schema_change_channel,
363 None,
364 )
365 }
366
367 pub fn new_with_auto_schema_change_callback(
368 actor_id: u32,
369 source_id: TableId,
370 fragment_id: u32,
371 source_name: String,
372 metrics: Arc<SourceMetrics>,
373 source_ctrl_opts: SourceCtrlOpts,
374 connector_props: ConnectorProperties,
375 schema_change_channel: Option<
376 mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
377 >,
378 on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
379 ) -> Self {
380 Self {
381 actor_id,
382 source_id,
383 fragment_id,
384 source_name,
385 metrics,
386 source_ctrl_opts,
387 connector_props,
388 schema_change_tx: schema_change_channel,
389 on_cdc_auto_schema_change_failure,
390 }
391 }
392
393 pub fn dummy() -> Self {
396 Self::new(
397 0,
398 TableId::new(0),
399 0,
400 "dummy".to_owned(),
401 Arc::new(SourceMetrics::default()),
402 SourceCtrlOpts {
403 chunk_size: MAX_CHUNK_SIZE,
404 split_txn: false,
405 },
406 ConnectorProperties::default(),
407 None,
408 )
409 }
410
411 pub fn on_cdc_auto_schema_change_failure(
414 &self,
415 table_id: u32,
416 table_name: String,
417 cdc_table_id: String,
418 upstream_ddl: String,
419 fail_info: String,
420 ) {
421 if let Some(ref cdc_auto_schema_change_fail_callback) =
422 self.on_cdc_auto_schema_change_failure
423 {
424 cdc_auto_schema_change_fail_callback.call(
425 table_id,
426 table_name,
427 cdc_table_id,
428 upstream_ddl,
429 fail_info,
430 );
431 }
432 }
433}
434
435#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
436pub enum SourceFormat {
437 #[default]
438 Invalid,
439 Native,
440 None,
441 Debezium,
442 DebeziumMongo,
443 Maxwell,
444 Canal,
445 Upsert,
446 Plain,
447}
448
449#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
451pub enum SourceEncode {
452 #[default]
453 Invalid,
454 Native,
455 None,
456 Avro,
457 Csv,
458 Protobuf,
459 Json,
460 Bytes,
461 Parquet,
462}
463
464#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
465pub struct SourceStruct {
466 pub format: SourceFormat,
467 pub encode: SourceEncode,
468}
469
470impl SourceStruct {
471 pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
472 Self { format, encode }
473 }
474}
475
476pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
478 use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
479
480 if let Ok(format) = info.get_row_format() {
482 let (format, encode) = match format {
483 RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
484 RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
485 RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
486 RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
487 RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
488 RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
489 RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
490 RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
491 RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
492 RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
493 RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
494 RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
495 RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
496 RowFormatType::RowUnspecified => unreachable!(),
497 };
498 return Ok(SourceStruct::new(format, encode));
499 }
500 let source_format = info.get_format()?;
501 let source_encode = info.get_row_encode()?;
502 let (format, encode) = match (source_format, source_encode) {
503 (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
504 (PbFormatType::Plain, PbEncodeType::Protobuf) => {
505 (SourceFormat::Plain, SourceEncode::Protobuf)
506 }
507 (PbFormatType::Debezium, PbEncodeType::Json) => {
508 (SourceFormat::Debezium, SourceEncode::Json)
509 }
510 (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
511 (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
512 (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
513 (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
514 (PbFormatType::Plain, PbEncodeType::Parquet) => {
515 (SourceFormat::Plain, SourceEncode::Parquet)
516 }
517 (PbFormatType::Native, PbEncodeType::Native) => {
518 (SourceFormat::Native, SourceEncode::Native)
519 }
520 (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
521 (PbFormatType::Debezium, PbEncodeType::Avro) => {
522 (SourceFormat::Debezium, SourceEncode::Avro)
523 }
524 (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
525 (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
526 (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
527 (SourceFormat::DebeziumMongo, SourceEncode::Json)
528 }
529 (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
530 (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
531 (SourceFormat::Upsert, SourceEncode::Protobuf)
532 }
533 (format, encode) => {
534 bail!(
535 "Unsupported combination of format {:?} and encode {:?}",
536 format,
537 encode
538 );
539 }
540 };
541 Ok(SourceStruct::new(format, encode))
542}
543
544pub type BoxSourceMessageStream =
546 BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
547pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
549pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
552pub type BoxSourceChunkWithStateStream =
554 BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
555
556pub type BoxStreamingFileSourceChunkStream =
558 BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
559
560pub trait SourceChunkStream:
562 Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
563{
564}
565impl<T> SourceChunkStream for T where
566 T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
567{
568}
569
570pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
571
572#[async_trait]
576pub trait SplitReader: Sized + Send {
577 type Properties;
578 type Split: SplitMetaData;
579
580 async fn new(
581 properties: Self::Properties,
582 state: Vec<Self::Split>,
583 parser_config: ParserConfig,
584 source_ctx: SourceContextRef,
585 columns: Option<Vec<Column>>,
586 ) -> crate::error::ConnectorResult<Self>;
587
588 fn into_stream(self) -> BoxSourceChunkStream;
589
590 fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
591 HashMap::new()
592 }
593
594 async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
595 Err(anyhow!("seek_to_latest is not supported for this connector").into())
596 }
597}
598
599#[derive(Debug, Clone)]
604pub enum BackfillInfo {
605 HasDataToBackfill {
606 latest_offset: String,
614 },
615 NoDataToBackfill,
620}
621
622for_all_sources!(impl_connector_properties);
623
624impl Default for ConnectorProperties {
625 fn default() -> Self {
626 ConnectorProperties::Test(Box::default())
627 }
628}
629
630impl ConnectorProperties {
631 pub fn extract(
638 with_properties: WithOptionsSecResolved,
639 deny_unknown_fields: bool,
640 ) -> Result<Self> {
641 let (options, secret_refs) = with_properties.into_parts();
642 let mut options_with_secret =
643 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
644 let connector = options_with_secret
645 .remove(UPSTREAM_SOURCE_KEY)
646 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
647 .to_lowercase();
648 match_source_name_str!(
649 connector.as_str(),
650 PropType,
651 PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
652 .map(ConnectorProperties::from),
653 |other| bail!("connector '{}' is not supported", other)
654 )
655 }
656
657 pub fn enforce_secret_source(
658 with_properties: &impl WithPropertiesExt,
659 ) -> crate::error::ConnectorResult<()> {
660 let connector = with_properties
661 .get_connector()
662 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
663 .to_lowercase();
664 let key_iter = with_properties.key_iter();
665 match_source_name_str!(
666 connector.as_str(),
667 PropType,
668 PropType::enforce_secret(key_iter),
669 |other| bail!("connector '{}' is not supported", other)
670 )
671 }
672
673 pub fn enable_drop_split(&self) -> bool {
674 matches!(
676 self,
677 ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
678 )
679 }
680
681 pub fn enable_adaptive_splits(&self) -> bool {
683 matches!(self, ConnectorProperties::Nats(_))
684 }
685
686 pub fn init_from_pb_source(&mut self, source: &PbSource) {
688 dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
689 }
690
691 pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
693 dispatch_source_prop!(self, |prop| prop
694 .init_from_pb_cdc_table_desc(cdc_table_desc))
695 }
696
697 pub fn support_multiple_splits(&self) -> bool {
698 matches!(self, ConnectorProperties::Kafka(_))
699 || matches!(self, ConnectorProperties::OpendalS3(_))
700 || matches!(self, ConnectorProperties::Gcs(_))
701 || matches!(self, ConnectorProperties::Azblob(_))
702 }
703
704 pub async fn create_split_enumerator(
705 self,
706 context: crate::source::base::SourceEnumeratorContextRef,
707 ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
708 let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
709 <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
710 ));
711 Ok(enumerator)
712 }
713
714 pub async fn create_split_reader(
715 self,
716 splits: Vec<SplitImpl>,
717 parser_config: ParserConfig,
718 source_ctx: SourceContextRef,
719 columns: Option<Vec<Column>>,
720 mut opt: crate::source::CreateSplitReaderOpt,
721 ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
722 opt.support_multiple_splits = self.support_multiple_splits();
723 tracing::debug!(
724 ?splits,
725 support_multiple_splits = opt.support_multiple_splits,
726 "spawning connector split reader",
727 );
728
729 dispatch_source_prop!(self, |prop| create_split_readers(
730 *prop,
731 splits,
732 parser_config,
733 source_ctx,
734 columns,
735 opt
736 )
737 .await)
738 }
739}
740
741for_all_sources!(impl_split);
742for_all_connections!(impl_connection);
743
744impl From<&SplitImpl> for ConnectorSplit {
745 fn from(split: &SplitImpl) -> Self {
746 dispatch_split_impl!(split, |inner| {
747 ConnectorSplit {
748 split_type: String::from(PropType::SOURCE_NAME),
749 encoded_split: inner.encode_to_bytes().to_vec(),
750 }
751 })
752 }
753}
754
755impl TryFrom<&ConnectorSplit> for SplitImpl {
756 type Error = crate::error::ConnectorError;
757
758 fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
759 let split_type = split.split_type.to_lowercase();
760 match_source_name_str!(
761 split_type.as_str(),
762 PropType,
763 {
764 <PropType as SourceProperties>::Split::restore_from_bytes(
765 split.encoded_split.as_ref(),
766 )
767 .map(Into::into)
768 },
769 |other| bail!("connector '{}' is not supported", other)
770 )
771 }
772}
773
774impl SplitImpl {
775 fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
776 let split_type = split_type.to_lowercase();
777 match_source_name_str!(
778 split_type.as_str(),
779 PropType,
780 <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
781 |other| bail!("connector '{}' is not supported", other)
782 )
783 }
784
785 pub fn is_cdc_split(&self) -> bool {
786 matches!(
787 self,
788 MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
789 )
790 }
791
792 pub fn get_cdc_split_offset(&self) -> String {
794 match self {
795 MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
796 PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
797 MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
798 CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
799 SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
800 _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
801 }
802 }
803
804 pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
805 #[expect(clippy::match_single_binding)]
806 match self {
807 _ => None,
811 }
812 }
813}
814
815impl SplitMetaData for SplitImpl {
816 fn id(&self) -> SplitId {
817 dispatch_split_impl!(self, |inner| inner.id())
818 }
819
820 fn encode_to_json(&self) -> JsonbVal {
821 use serde_json::json;
822 let inner = self.encode_to_json_inner().take();
823 json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
824 }
825
826 fn restore_from_json(value: JsonbVal) -> Result<Self> {
827 let mut value = value.take();
828 let json_obj = value.as_object_mut().unwrap();
829 let split_type = json_obj
830 .remove(SPLIT_TYPE_FIELD)
831 .unwrap()
832 .as_str()
833 .unwrap()
834 .to_owned();
835 let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
836 Self::restore_from_json_inner(&split_type, inner_value.into())
837 }
838
839 fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
840 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
841 }
842}
843
844impl SplitImpl {
845 pub fn get_type(&self) -> String {
846 dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
847 }
848
849 pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
850 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
851 Ok(())
852 }
853
854 pub fn encode_to_json_inner(&self) -> JsonbVal {
855 dispatch_split_impl!(self, |inner| inner.encode_to_json())
856 }
857}
858
859use risingwave_common::types::DataType;
860
861#[derive(Clone, Debug)]
862pub struct Column {
863 pub name: String,
864 pub data_type: DataType,
865 pub is_visible: bool,
867}
868
869pub type SplitId = Arc<str>;
871
872#[derive(Debug, Clone)]
875pub struct SourceMessage {
876 pub key: Option<Vec<u8>>,
877 pub payload: Option<Vec<u8>>,
878 pub offset: String, pub split_id: SplitId,
880 pub meta: SourceMeta,
881}
882
883impl SourceMessage {
884 pub fn dummy() -> Self {
886 Self {
887 key: None,
888 payload: None,
889 offset: "".to_owned(),
890 split_id: "".into(),
891 meta: SourceMeta::Empty,
892 }
893 }
894
895 pub fn is_cdc_heartbeat(&self) -> bool {
897 self.key.is_none() && self.payload.is_none()
898 }
899}
900
901#[derive(Debug, Clone)]
902pub enum SourceMeta {
903 Kafka(KafkaMeta),
904 Kinesis(KinesisMeta),
905 Pulsar(PulsarMeta),
906 Nexmark(NexmarkMeta),
907 GooglePubsub(GooglePubsubMeta),
908 Datagen(DatagenMeta),
909 DebeziumCdc(DebeziumCdcMeta),
910 Nats(NatsMeta),
911 Empty,
913}
914
915impl PartialEq for SourceMessage {
917 fn eq(&self, other: &Self) -> bool {
918 self.offset == other.offset
919 && self.split_id == other.split_id
920 && self.payload == other.payload
921 }
922}
923impl Eq for SourceMessage {}
924
925pub trait SplitMetaData: Sized {
927 fn id(&self) -> SplitId;
928 fn encode_to_bytes(&self) -> Bytes {
929 self.encode_to_json()
930 .as_scalar_ref()
931 .value_serialize()
932 .into()
933 }
934 fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
935 Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
936 }
937
938 fn encode_to_json(&self) -> JsonbVal;
940 fn restore_from_json(value: JsonbVal) -> Result<Self>;
941 fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
942}
943
944pub type ConnectorState = Option<Vec<SplitImpl>>;
949
950#[cfg(test)]
951mod tests {
952 use maplit::*;
953 use nexmark::event::EventType;
954
955 use super::*;
956 use crate::source::cdc::{DebeziumCdcSplit, Mysql};
957 use crate::source::kafka::KafkaSplit;
958
959 #[test]
960 fn test_split_impl_get_fn() -> Result<()> {
961 let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
962 let split_impl = SplitImpl::Kafka(split.clone());
963 let get_value = split_impl.into_kafka().unwrap();
964 println!("{:?}", get_value);
965 assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
966 assert_eq!(split.encode_to_json(), get_value.encode_to_json());
967
968 Ok(())
969 }
970
971 #[test]
972 fn test_cdc_split_state() -> Result<()> {
973 let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
974 let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
975 let split_impl = SplitImpl::MysqlCdc(split);
976 let encoded_split = split_impl.encode_to_bytes();
977 let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
978 assert_eq!(
979 split_impl.encode_to_bytes(),
980 restored_split_impl.encode_to_bytes()
981 );
982 assert_eq!(
983 split_impl.encode_to_json(),
984 restored_split_impl.encode_to_json()
985 );
986
987 let encoded_split = split_impl.encode_to_json();
988 let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
989 assert_eq!(
990 split_impl.encode_to_bytes(),
991 restored_split_impl.encode_to_bytes()
992 );
993 assert_eq!(
994 split_impl.encode_to_json(),
995 restored_split_impl.encode_to_json()
996 );
997 Ok(())
998 }
999
1000 #[test]
1001 fn test_extract_nexmark_config() {
1002 let props = convert_args!(btreemap!(
1003 "connector" => "nexmark",
1004 "nexmark.table.type" => "Person",
1005 "nexmark.split.num" => "1",
1006 ));
1007
1008 let props =
1009 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1010 .unwrap();
1011
1012 if let ConnectorProperties::Nexmark(props) = props {
1013 assert_eq!(props.table_type, Some(EventType::Person));
1014 assert_eq!(props.split_num, 1);
1015 } else {
1016 panic!("extract nexmark config failed");
1017 }
1018 }
1019
1020 #[test]
1021 fn test_extract_kafka_config() {
1022 let props = convert_args!(btreemap!(
1023 "connector" => "kafka",
1024 "properties.bootstrap.server" => "b1,b2",
1025 "topic" => "test",
1026 "scan.startup.mode" => "earliest",
1027 "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
1028 ));
1029
1030 let props =
1031 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1032 .unwrap();
1033 if let ConnectorProperties::Kafka(k) = props {
1034 let btreemap = btreemap! {
1035 "b-1:9092".to_owned() => "dns-1".to_owned(),
1036 "b-2:9092".to_owned() => "dns-2".to_owned(),
1037 };
1038 assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
1039 } else {
1040 panic!("extract kafka config failed");
1041 }
1042 }
1043
1044 #[test]
1045 fn test_extract_cdc_properties() {
1046 let user_props_mysql = convert_args!(btreemap!(
1047 "connector" => "mysql-cdc",
1048 "database.hostname" => "127.0.0.1",
1049 "database.port" => "3306",
1050 "database.user" => "root",
1051 "database.password" => "123456",
1052 "database.name" => "mydb",
1053 "table.name" => "products",
1054 ));
1055
1056 let user_props_postgres = convert_args!(btreemap!(
1057 "connector" => "postgres-cdc",
1058 "database.hostname" => "127.0.0.1",
1059 "database.port" => "5432",
1060 "database.user" => "root",
1061 "database.password" => "654321",
1062 "schema.name" => "public",
1063 "database.name" => "mypgdb",
1064 "table.name" => "orders",
1065 ));
1066
1067 let conn_props = ConnectorProperties::extract(
1068 WithOptionsSecResolved::without_secrets(user_props_mysql),
1069 true,
1070 )
1071 .unwrap();
1072 if let ConnectorProperties::MysqlCdc(c) = conn_props {
1073 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1074 assert_eq!(c.properties.get("database.port").unwrap(), "3306");
1075 assert_eq!(c.properties.get("database.user").unwrap(), "root");
1076 assert_eq!(c.properties.get("database.password").unwrap(), "123456");
1077 assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
1078 assert_eq!(c.properties.get("table.name").unwrap(), "products");
1079 } else {
1080 panic!("extract cdc config failed");
1081 }
1082
1083 let conn_props = ConnectorProperties::extract(
1084 WithOptionsSecResolved::without_secrets(user_props_postgres),
1085 true,
1086 )
1087 .unwrap();
1088 if let ConnectorProperties::PostgresCdc(c) = conn_props {
1089 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1090 assert_eq!(c.properties.get("database.port").unwrap(), "5432");
1091 assert_eq!(c.properties.get("database.user").unwrap(), "root");
1092 assert_eq!(c.properties.get("database.password").unwrap(), "654321");
1093 assert_eq!(c.properties.get("schema.name").unwrap(), "public");
1094 assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1095 assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1096 } else {
1097 panic!("extract cdc config failed");
1098 }
1099 }
1100}