1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::catalog::TableId;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::monitor::EnumeratorMetrics;
54use crate::with_options::WithOptions;
55use crate::{
56 WithOptionsSecResolved, dispatch_source_prop, dispatch_split_impl, for_all_connections,
57 for_all_sources, impl_connection, impl_connector_properties, impl_split, match_source_name_str,
58};
59
60const SPLIT_TYPE_FIELD: &str = "split_type";
61const SPLIT_INFO_FIELD: &str = "split_info";
62pub const UPSTREAM_SOURCE_KEY: &str = "connector";
63
64pub const WEBHOOK_CONNECTOR: &str = "webhook";
65
66pub trait TryFromBTreeMap: Sized + UnknownFields {
67 fn try_from_btreemap(
69 props: BTreeMap<String, String>,
70 deny_unknown_fields: bool,
71 ) -> Result<Self>;
72}
73
74pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug {
78 const SOURCE_NAME: &'static str;
79 type Split: SplitMetaData
80 + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
81 + Into<SplitImpl>;
82 type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
83 type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
84
85 fn init_from_pb_source(&mut self, _source: &PbSource) {}
87
88 fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
90}
91
92pub trait UnknownFields {
93 fn unknown_fields(&self) -> HashMap<String, String>;
95}
96
97impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
98 fn try_from_btreemap(
99 props: BTreeMap<String, String>,
100 deny_unknown_fields: bool,
101 ) -> Result<Self> {
102 let json_value = serde_json::to_value(props)?;
103 let res = serde_json::from_value::<P>(json_value)?;
104
105 if !deny_unknown_fields || res.unknown_fields().is_empty() {
106 Ok(res)
107 } else {
108 bail!(
109 "Unknown fields in the WITH clause: {:?}",
110 res.unknown_fields()
111 )
112 }
113 }
114}
115
116#[derive(Default)]
117pub struct CreateSplitReaderOpt {
118 pub support_multiple_splits: bool,
119 pub seek_to_latest: bool,
120}
121
122#[derive(Default)]
123pub struct CreateSplitReaderResult {
124 pub latest_splits: Option<Vec<SplitImpl>>,
125 pub backfill_info: HashMap<SplitId, BackfillInfo>,
126}
127
128pub async fn create_split_readers<P: SourceProperties>(
129 prop: P,
130 splits: Vec<SplitImpl>,
131 parser_config: ParserConfig,
132 source_ctx: SourceContextRef,
133 columns: Option<Vec<Column>>,
134 opt: CreateSplitReaderOpt,
135) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
136 let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
137 let mut res = CreateSplitReaderResult {
138 backfill_info: HashMap::new(),
139 latest_splits: None,
140 };
141 if opt.support_multiple_splits {
142 let mut reader = P::SplitReader::new(
143 prop.clone(),
144 splits,
145 parser_config.clone(),
146 source_ctx.clone(),
147 columns.clone(),
148 )
149 .await?;
150 if opt.seek_to_latest {
151 res.latest_splits = Some(reader.seek_to_latest().await?);
152 }
153 res.backfill_info = reader.backfill_info();
154 Ok((reader.into_stream().boxed(), res))
155 } else {
156 let mut readers = try_join_all(splits.into_iter().map(|split| {
157 P::SplitReader::new(
160 prop.clone(),
161 vec![split],
162 parser_config.clone(),
163 source_ctx.clone(),
164 columns.clone(),
165 )
166 }))
167 .await?;
168 if opt.seek_to_latest {
169 let mut latest_splits = vec![];
170 for reader in &mut readers {
171 latest_splits.extend(reader.seek_to_latest().await?);
172 }
173 res.latest_splits = Some(latest_splits);
174 }
175 res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
176 Ok((
177 select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
178 res,
179 ))
180 }
181}
182
183#[async_trait]
186pub trait SplitEnumerator: Sized + Send {
187 type Split: SplitMetaData + Send;
188 type Properties;
189
190 async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
191 -> Result<Self>;
192 async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
193 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
195 Ok(())
196 }
197 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
199 Ok(())
200 }
201}
202
203pub type SourceContextRef = Arc<SourceContext>;
204pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
205
206#[async_trait]
208pub trait AnySplitEnumerator: Send {
209 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
210 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
211 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
212}
213
214#[async_trait]
215impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
216 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
217 SplitEnumerator::list_splits(self)
218 .await
219 .map(|s| s.into_iter().map(|s| s.into()).collect())
220 }
221
222 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
223 SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
224 }
225
226 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
227 SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
228 }
229}
230
231pub const MAX_CHUNK_SIZE: usize = 1024;
233
234#[derive(Debug, Clone, Copy)]
235pub struct SourceCtrlOpts {
236 pub chunk_size: usize,
238 pub split_txn: bool,
240}
241
242impl !Default for SourceCtrlOpts {}
245
246impl SourceCtrlOpts {
247 #[cfg(test)]
248 pub fn for_test() -> Self {
249 SourceCtrlOpts {
250 chunk_size: 256,
251 split_txn: false,
252 }
253 }
254}
255
256#[derive(Debug)]
257pub struct SourceEnumeratorContext {
258 pub info: SourceEnumeratorInfo,
259 pub metrics: Arc<EnumeratorMetrics>,
260}
261
262impl SourceEnumeratorContext {
263 pub fn dummy() -> SourceEnumeratorContext {
266 SourceEnumeratorContext {
267 info: SourceEnumeratorInfo { source_id: 0 },
268 metrics: Arc::new(EnumeratorMetrics::default()),
269 }
270 }
271}
272
273#[derive(Clone, Debug)]
274pub struct SourceEnumeratorInfo {
275 pub source_id: u32,
276}
277
278#[derive(Debug, Clone)]
279pub struct SourceContext {
280 pub actor_id: u32,
281 pub source_id: TableId,
282 pub fragment_id: u32,
283 pub source_name: String,
284 pub metrics: Arc<SourceMetrics>,
285 pub source_ctrl_opts: SourceCtrlOpts,
286 pub connector_props: ConnectorProperties,
287 pub schema_change_tx:
289 Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
290}
291
292impl SourceContext {
293 pub fn new(
294 actor_id: u32,
295 source_id: TableId,
296 fragment_id: u32,
297 source_name: String,
298 metrics: Arc<SourceMetrics>,
299 source_ctrl_opts: SourceCtrlOpts,
300 connector_props: ConnectorProperties,
301 schema_change_channel: Option<
302 mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
303 >,
304 ) -> Self {
305 Self {
306 actor_id,
307 source_id,
308 fragment_id,
309 source_name,
310 metrics,
311 source_ctrl_opts,
312 connector_props,
313 schema_change_tx: schema_change_channel,
314 }
315 }
316
317 pub fn dummy() -> Self {
320 Self::new(
321 0,
322 TableId::new(0),
323 0,
324 "dummy".to_owned(),
325 Arc::new(SourceMetrics::default()),
326 SourceCtrlOpts {
327 chunk_size: MAX_CHUNK_SIZE,
328 split_txn: false,
329 },
330 ConnectorProperties::default(),
331 None,
332 )
333 }
334}
335
336#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
337pub enum SourceFormat {
338 #[default]
339 Invalid,
340 Native,
341 None,
342 Debezium,
343 DebeziumMongo,
344 Maxwell,
345 Canal,
346 Upsert,
347 Plain,
348}
349
350#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
352pub enum SourceEncode {
353 #[default]
354 Invalid,
355 Native,
356 None,
357 Avro,
358 Csv,
359 Protobuf,
360 Json,
361 Bytes,
362 Parquet,
363}
364
365#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
366pub struct SourceStruct {
367 pub format: SourceFormat,
368 pub encode: SourceEncode,
369}
370
371impl SourceStruct {
372 pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
373 Self { format, encode }
374 }
375}
376
377pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
379 use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
380
381 if let Ok(format) = info.get_row_format() {
383 let (format, encode) = match format {
384 RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
385 RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
386 RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
387 RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
388 RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
389 RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
390 RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
391 RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
392 RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
393 RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
394 RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
395 RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
396 RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
397 RowFormatType::RowUnspecified => unreachable!(),
398 };
399 return Ok(SourceStruct::new(format, encode));
400 }
401 let source_format = info.get_format()?;
402 let source_encode = info.get_row_encode()?;
403 let (format, encode) = match (source_format, source_encode) {
404 (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
405 (PbFormatType::Plain, PbEncodeType::Protobuf) => {
406 (SourceFormat::Plain, SourceEncode::Protobuf)
407 }
408 (PbFormatType::Debezium, PbEncodeType::Json) => {
409 (SourceFormat::Debezium, SourceEncode::Json)
410 }
411 (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
412 (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
413 (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
414 (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
415 (PbFormatType::Plain, PbEncodeType::Parquet) => {
416 (SourceFormat::Plain, SourceEncode::Parquet)
417 }
418 (PbFormatType::Native, PbEncodeType::Native) => {
419 (SourceFormat::Native, SourceEncode::Native)
420 }
421 (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
422 (PbFormatType::Debezium, PbEncodeType::Avro) => {
423 (SourceFormat::Debezium, SourceEncode::Avro)
424 }
425 (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
426 (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
427 (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
428 (SourceFormat::DebeziumMongo, SourceEncode::Json)
429 }
430 (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
431 (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
432 (SourceFormat::Upsert, SourceEncode::Protobuf)
433 }
434 (format, encode) => {
435 bail!(
436 "Unsupported combination of format {:?} and encode {:?}",
437 format,
438 encode
439 );
440 }
441 };
442 Ok(SourceStruct::new(format, encode))
443}
444
445pub type BoxSourceMessageStream =
447 BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
448pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
450pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
451pub type BoxSourceChunkWithStateStream =
452 BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
453
454pub type BoxStreamingFileSourceChunkStream =
456 BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
457
458pub trait SourceChunkStream:
460 Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
461{
462}
463impl<T> SourceChunkStream for T where
464 T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
465{
466}
467
468pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
469
470#[async_trait]
474pub trait SplitReader: Sized + Send {
475 type Properties;
476 type Split: SplitMetaData;
477
478 async fn new(
479 properties: Self::Properties,
480 state: Vec<Self::Split>,
481 parser_config: ParserConfig,
482 source_ctx: SourceContextRef,
483 columns: Option<Vec<Column>>,
484 ) -> crate::error::ConnectorResult<Self>;
485
486 fn into_stream(self) -> BoxSourceChunkStream;
487
488 fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
489 HashMap::new()
490 }
491
492 async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
493 Err(anyhow!("seek_to_latest is not supported for this connector").into())
494 }
495}
496
497#[derive(Debug, Clone)]
502pub enum BackfillInfo {
503 HasDataToBackfill {
504 latest_offset: String,
512 },
513 NoDataToBackfill,
518}
519
520for_all_sources!(impl_connector_properties);
521
522impl Default for ConnectorProperties {
523 fn default() -> Self {
524 ConnectorProperties::Test(Box::default())
525 }
526}
527
528impl ConnectorProperties {
529 pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
530 with_properties
531 .get(UPSTREAM_SOURCE_KEY)
532 .map(|s| {
533 s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
534 || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
535 || s.eq_ignore_ascii_case(GCS_CONNECTOR)
536 || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
537 })
538 .unwrap_or(false)
539 }
540}
541
542impl ConnectorProperties {
543 pub fn extract(
550 with_properties: WithOptionsSecResolved,
551 deny_unknown_fields: bool,
552 ) -> Result<Self> {
553 let (options, secret_refs) = with_properties.into_parts();
554 let mut options_with_secret =
555 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
556 let connector = options_with_secret
557 .remove(UPSTREAM_SOURCE_KEY)
558 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
559 .to_lowercase();
560 match_source_name_str!(
561 connector.as_str(),
562 PropType,
563 PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
564 .map(ConnectorProperties::from),
565 |other| bail!("connector '{}' is not supported", other)
566 )
567 }
568
569 pub fn enable_drop_split(&self) -> bool {
570 matches!(
572 self,
573 ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
574 )
575 }
576
577 pub fn enable_adaptive_splits(&self) -> bool {
579 matches!(self, ConnectorProperties::Nats(_))
580 }
581
582 pub fn init_from_pb_source(&mut self, source: &PbSource) {
584 dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
585 }
586
587 pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
589 dispatch_source_prop!(self, |prop| prop
590 .init_from_pb_cdc_table_desc(cdc_table_desc))
591 }
592
593 pub fn support_multiple_splits(&self) -> bool {
594 matches!(self, ConnectorProperties::Kafka(_))
595 || matches!(self, ConnectorProperties::OpendalS3(_))
596 || matches!(self, ConnectorProperties::Gcs(_))
597 || matches!(self, ConnectorProperties::Azblob(_))
598 }
599
600 pub async fn create_split_enumerator(
601 self,
602 context: crate::source::base::SourceEnumeratorContextRef,
603 ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
604 let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
605 <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
606 ));
607 Ok(enumerator)
608 }
609
610 pub async fn create_split_reader(
611 self,
612 splits: Vec<SplitImpl>,
613 parser_config: ParserConfig,
614 source_ctx: SourceContextRef,
615 columns: Option<Vec<Column>>,
616 mut opt: crate::source::CreateSplitReaderOpt,
617 ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
618 opt.support_multiple_splits = self.support_multiple_splits();
619 tracing::debug!(
620 ?splits,
621 support_multiple_splits = opt.support_multiple_splits,
622 "spawning connector split reader",
623 );
624
625 dispatch_source_prop!(self, |prop| create_split_readers(
626 *prop,
627 splits,
628 parser_config,
629 source_ctx,
630 columns,
631 opt
632 )
633 .await)
634 }
635}
636
637for_all_sources!(impl_split);
638for_all_connections!(impl_connection);
639
640impl From<&SplitImpl> for ConnectorSplit {
641 fn from(split: &SplitImpl) -> Self {
642 dispatch_split_impl!(split, |inner| {
643 ConnectorSplit {
644 split_type: String::from(PropType::SOURCE_NAME),
645 encoded_split: inner.encode_to_bytes().to_vec(),
646 }
647 })
648 }
649}
650
651impl TryFrom<&ConnectorSplit> for SplitImpl {
652 type Error = crate::error::ConnectorError;
653
654 fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
655 let split_type = split.split_type.to_lowercase();
656 match_source_name_str!(
657 split_type.as_str(),
658 PropType,
659 {
660 <PropType as SourceProperties>::Split::restore_from_bytes(
661 split.encoded_split.as_ref(),
662 )
663 .map(Into::into)
664 },
665 |other| bail!("connector '{}' is not supported", other)
666 )
667 }
668}
669
670impl SplitImpl {
671 fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
672 let split_type = split_type.to_lowercase();
673 match_source_name_str!(
674 split_type.as_str(),
675 PropType,
676 <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
677 |other| bail!("connector '{}' is not supported", other)
678 )
679 }
680
681 pub fn is_cdc_split(&self) -> bool {
682 matches!(
683 self,
684 MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
685 )
686 }
687
688 pub fn get_cdc_split_offset(&self) -> String {
690 match self {
691 MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
692 PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
693 MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
694 CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
695 SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
696 _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
697 }
698 }
699}
700
701impl SplitMetaData for SplitImpl {
702 fn id(&self) -> SplitId {
703 dispatch_split_impl!(self, |inner| inner.id())
704 }
705
706 fn encode_to_json(&self) -> JsonbVal {
707 use serde_json::json;
708 let inner = self.encode_to_json_inner().take();
709 json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
710 }
711
712 fn restore_from_json(value: JsonbVal) -> Result<Self> {
713 let mut value = value.take();
714 let json_obj = value.as_object_mut().unwrap();
715 let split_type = json_obj
716 .remove(SPLIT_TYPE_FIELD)
717 .unwrap()
718 .as_str()
719 .unwrap()
720 .to_owned();
721 let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
722 Self::restore_from_json_inner(&split_type, inner_value.into())
723 }
724
725 fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
726 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
727 }
728}
729
730impl SplitImpl {
731 pub fn get_type(&self) -> String {
732 dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
733 }
734
735 pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
736 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
737 Ok(())
738 }
739
740 pub fn encode_to_json_inner(&self) -> JsonbVal {
741 dispatch_split_impl!(self, |inner| inner.encode_to_json())
742 }
743}
744
745use risingwave_common::types::DataType;
746
747#[derive(Clone, Debug)]
748pub struct Column {
749 pub name: String,
750 pub data_type: DataType,
751 pub is_visible: bool,
753}
754
755pub type SplitId = Arc<str>;
757
758#[derive(Debug, Clone)]
761pub struct SourceMessage {
762 pub key: Option<Vec<u8>>,
763 pub payload: Option<Vec<u8>>,
764 pub offset: String, pub split_id: SplitId,
766 pub meta: SourceMeta,
767}
768
769impl SourceMessage {
770 pub fn dummy() -> Self {
772 Self {
773 key: None,
774 payload: None,
775 offset: "".to_owned(),
776 split_id: "".into(),
777 meta: SourceMeta::Empty,
778 }
779 }
780
781 pub fn is_cdc_heartbeat(&self) -> bool {
783 self.key.is_none() && self.payload.is_none()
784 }
785}
786
787#[derive(Debug, Clone)]
788pub enum SourceMeta {
789 Kafka(KafkaMeta),
790 Kinesis(KinesisMeta),
791 Pulsar(PulsarMeta),
792 Nexmark(NexmarkMeta),
793 GooglePubsub(GooglePubsubMeta),
794 Datagen(DatagenMeta),
795 DebeziumCdc(DebeziumCdcMeta),
796 Nats(NatsMeta),
797 Empty,
799}
800
801impl PartialEq for SourceMessage {
803 fn eq(&self, other: &Self) -> bool {
804 self.offset == other.offset
805 && self.split_id == other.split_id
806 && self.payload == other.payload
807 }
808}
809impl Eq for SourceMessage {}
810
811pub trait SplitMetaData: Sized {
813 fn id(&self) -> SplitId;
814 fn encode_to_bytes(&self) -> Bytes {
815 self.encode_to_json()
816 .as_scalar_ref()
817 .value_serialize()
818 .into()
819 }
820 fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
821 Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
822 }
823
824 fn encode_to_json(&self) -> JsonbVal;
826 fn restore_from_json(value: JsonbVal) -> Result<Self>;
827 fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
828}
829
830pub type ConnectorState = Option<Vec<SplitImpl>>;
835
836#[cfg(test)]
837mod tests {
838 use maplit::*;
839 use nexmark::event::EventType;
840
841 use super::*;
842 use crate::source::cdc::{DebeziumCdcSplit, Mysql};
843 use crate::source::kafka::KafkaSplit;
844
845 #[test]
846 fn test_split_impl_get_fn() -> Result<()> {
847 let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
848 let split_impl = SplitImpl::Kafka(split.clone());
849 let get_value = split_impl.into_kafka().unwrap();
850 println!("{:?}", get_value);
851 assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
852 assert_eq!(split.encode_to_json(), get_value.encode_to_json());
853
854 Ok(())
855 }
856
857 #[test]
858 fn test_cdc_split_state() -> Result<()> {
859 let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
860 let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
861 let split_impl = SplitImpl::MysqlCdc(split);
862 let encoded_split = split_impl.encode_to_bytes();
863 let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
864 assert_eq!(
865 split_impl.encode_to_bytes(),
866 restored_split_impl.encode_to_bytes()
867 );
868 assert_eq!(
869 split_impl.encode_to_json(),
870 restored_split_impl.encode_to_json()
871 );
872
873 let encoded_split = split_impl.encode_to_json();
874 let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
875 assert_eq!(
876 split_impl.encode_to_bytes(),
877 restored_split_impl.encode_to_bytes()
878 );
879 assert_eq!(
880 split_impl.encode_to_json(),
881 restored_split_impl.encode_to_json()
882 );
883 Ok(())
884 }
885
886 #[test]
887 fn test_extract_nexmark_config() {
888 let props = convert_args!(btreemap!(
889 "connector" => "nexmark",
890 "nexmark.table.type" => "Person",
891 "nexmark.split.num" => "1",
892 ));
893
894 let props =
895 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
896 .unwrap();
897
898 if let ConnectorProperties::Nexmark(props) = props {
899 assert_eq!(props.table_type, Some(EventType::Person));
900 assert_eq!(props.split_num, 1);
901 } else {
902 panic!("extract nexmark config failed");
903 }
904 }
905
906 #[test]
907 fn test_extract_kafka_config() {
908 let props = convert_args!(btreemap!(
909 "connector" => "kafka",
910 "properties.bootstrap.server" => "b1,b2",
911 "topic" => "test",
912 "scan.startup.mode" => "earliest",
913 "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
914 ));
915
916 let props =
917 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
918 .unwrap();
919 if let ConnectorProperties::Kafka(k) = props {
920 let btreemap = btreemap! {
921 "b-1:9092".to_owned() => "dns-1".to_owned(),
922 "b-2:9092".to_owned() => "dns-2".to_owned(),
923 };
924 assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
925 } else {
926 panic!("extract kafka config failed");
927 }
928 }
929
930 #[test]
931 fn test_extract_cdc_properties() {
932 let user_props_mysql = convert_args!(btreemap!(
933 "connector" => "mysql-cdc",
934 "database.hostname" => "127.0.0.1",
935 "database.port" => "3306",
936 "database.user" => "root",
937 "database.password" => "123456",
938 "database.name" => "mydb",
939 "table.name" => "products",
940 ));
941
942 let user_props_postgres = convert_args!(btreemap!(
943 "connector" => "postgres-cdc",
944 "database.hostname" => "127.0.0.1",
945 "database.port" => "5432",
946 "database.user" => "root",
947 "database.password" => "654321",
948 "schema.name" => "public",
949 "database.name" => "mypgdb",
950 "table.name" => "orders",
951 ));
952
953 let conn_props = ConnectorProperties::extract(
954 WithOptionsSecResolved::without_secrets(user_props_mysql),
955 true,
956 )
957 .unwrap();
958 if let ConnectorProperties::MysqlCdc(c) = conn_props {
959 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
960 assert_eq!(c.properties.get("database.port").unwrap(), "3306");
961 assert_eq!(c.properties.get("database.user").unwrap(), "root");
962 assert_eq!(c.properties.get("database.password").unwrap(), "123456");
963 assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
964 assert_eq!(c.properties.get("table.name").unwrap(), "products");
965 } else {
966 panic!("extract cdc config failed");
967 }
968
969 let conn_props = ConnectorProperties::extract(
970 WithOptionsSecResolved::without_secrets(user_props_postgres),
971 true,
972 )
973 .unwrap();
974 if let ConnectorProperties::PostgresCdc(c) = conn_props {
975 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
976 assert_eq!(c.properties.get("database.port").unwrap(), "5432");
977 assert_eq!(c.properties.get("database.user").unwrap(), "root");
978 assert_eq!(c.properties.get("database.password").unwrap(), "654321");
979 assert_eq!(c.properties.get("schema.name").unwrap(), "public");
980 assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
981 assert_eq!(c.properties.get("table.name").unwrap(), "orders");
982 } else {
983 panic!("extract cdc config failed");
984 }
985 }
986}