1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::catalog::TableId;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use crate::enforce_secret::EnforceSecret;
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::batch::BatchSourceSplitImpl;
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57 WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58 for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59 match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68pub trait TryFromBTreeMap: Sized + UnknownFields {
69 fn try_from_btreemap(
71 props: BTreeMap<String, String>,
72 deny_unknown_fields: bool,
73 ) -> Result<Self>;
74}
75
76pub trait SourceProperties:
80 TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
81{
82 const SOURCE_NAME: &'static str;
83 type Split: SplitMetaData
84 + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
85 + Into<SplitImpl>;
86 type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
87 type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
88
89 fn init_from_pb_source(&mut self, _source: &PbSource) {}
91
92 fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
94}
95
96pub trait UnknownFields {
97 fn unknown_fields(&self) -> HashMap<String, String>;
99}
100
101impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
102 fn try_from_btreemap(
103 props: BTreeMap<String, String>,
104 deny_unknown_fields: bool,
105 ) -> Result<Self> {
106 let json_value = serde_json::to_value(props)?;
107 let res = serde_json::from_value::<P>(json_value)?;
108
109 if !deny_unknown_fields || res.unknown_fields().is_empty() {
110 Ok(res)
111 } else {
112 bail!(
113 "Unknown fields in the WITH clause: {:?}",
114 res.unknown_fields()
115 )
116 }
117 }
118}
119
120#[derive(Default)]
121pub struct CreateSplitReaderOpt {
122 pub support_multiple_splits: bool,
123 pub seek_to_latest: bool,
124}
125
126#[derive(Default)]
127pub struct CreateSplitReaderResult {
128 pub latest_splits: Option<Vec<SplitImpl>>,
129 pub backfill_info: HashMap<SplitId, BackfillInfo>,
130}
131
132pub async fn create_split_readers<P: SourceProperties>(
133 prop: P,
134 splits: Vec<SplitImpl>,
135 parser_config: ParserConfig,
136 source_ctx: SourceContextRef,
137 columns: Option<Vec<Column>>,
138 opt: CreateSplitReaderOpt,
139) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
140 let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
141 let mut res = CreateSplitReaderResult {
142 backfill_info: HashMap::new(),
143 latest_splits: None,
144 };
145 if opt.support_multiple_splits {
146 let mut reader = P::SplitReader::new(
147 prop.clone(),
148 splits,
149 parser_config.clone(),
150 source_ctx.clone(),
151 columns.clone(),
152 )
153 .await?;
154 if opt.seek_to_latest {
155 res.latest_splits = Some(reader.seek_to_latest().await?);
156 }
157 res.backfill_info = reader.backfill_info();
158 Ok((reader.into_stream().boxed(), res))
159 } else {
160 let mut readers = try_join_all(splits.into_iter().map(|split| {
161 P::SplitReader::new(
164 prop.clone(),
165 vec![split],
166 parser_config.clone(),
167 source_ctx.clone(),
168 columns.clone(),
169 )
170 }))
171 .await?;
172 if opt.seek_to_latest {
173 let mut latest_splits = vec![];
174 for reader in &mut readers {
175 latest_splits.extend(reader.seek_to_latest().await?);
176 }
177 res.latest_splits = Some(latest_splits);
178 }
179 res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
180 Ok((
181 select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
182 res,
183 ))
184 }
185}
186
187#[async_trait]
190pub trait SplitEnumerator: Sized + Send {
191 type Split: SplitMetaData + Send;
192 type Properties;
193
194 async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
195 -> Result<Self>;
196 async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
197 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
199 Ok(())
200 }
201 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
203 Ok(())
204 }
205}
206
207pub type SourceContextRef = Arc<SourceContext>;
208pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
209
210#[async_trait]
212pub trait AnySplitEnumerator: Send {
213 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
214 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
215 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
216}
217
218#[async_trait]
219impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
220 async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
221 SplitEnumerator::list_splits(self)
222 .await
223 .map(|s| s.into_iter().map(|s| s.into()).collect())
224 }
225
226 async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
227 SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
228 }
229
230 async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
231 SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
232 }
233}
234
235pub const MAX_CHUNK_SIZE: usize = 1024;
237
238#[derive(Debug, Clone, Copy)]
239pub struct SourceCtrlOpts {
240 pub chunk_size: usize,
242 pub split_txn: bool,
244}
245
246impl !Default for SourceCtrlOpts {}
249
250impl SourceCtrlOpts {
251 #[cfg(test)]
252 pub fn for_test() -> Self {
253 SourceCtrlOpts {
254 chunk_size: 256,
255 split_txn: false,
256 }
257 }
258}
259
260#[derive(Debug)]
261pub struct SourceEnumeratorContext {
262 pub info: SourceEnumeratorInfo,
263 pub metrics: Arc<EnumeratorMetrics>,
264}
265
266impl SourceEnumeratorContext {
267 pub fn dummy() -> SourceEnumeratorContext {
270 SourceEnumeratorContext {
271 info: SourceEnumeratorInfo { source_id: 0 },
272 metrics: Arc::new(EnumeratorMetrics::default()),
273 }
274 }
275}
276
277#[derive(Clone, Debug)]
278pub struct SourceEnumeratorInfo {
279 pub source_id: u32,
280}
281
282#[derive(Debug, Clone)]
283pub struct SourceContext {
284 pub actor_id: u32,
285 pub source_id: TableId,
286 pub fragment_id: u32,
287 pub source_name: String,
288 pub metrics: Arc<SourceMetrics>,
289 pub source_ctrl_opts: SourceCtrlOpts,
290 pub connector_props: ConnectorProperties,
291 pub schema_change_tx:
293 Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
294}
295
296impl SourceContext {
297 pub fn new(
298 actor_id: u32,
299 source_id: TableId,
300 fragment_id: u32,
301 source_name: String,
302 metrics: Arc<SourceMetrics>,
303 source_ctrl_opts: SourceCtrlOpts,
304 connector_props: ConnectorProperties,
305 schema_change_channel: Option<
306 mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
307 >,
308 ) -> Self {
309 Self {
310 actor_id,
311 source_id,
312 fragment_id,
313 source_name,
314 metrics,
315 source_ctrl_opts,
316 connector_props,
317 schema_change_tx: schema_change_channel,
318 }
319 }
320
321 pub fn dummy() -> Self {
324 Self::new(
325 0,
326 TableId::new(0),
327 0,
328 "dummy".to_owned(),
329 Arc::new(SourceMetrics::default()),
330 SourceCtrlOpts {
331 chunk_size: MAX_CHUNK_SIZE,
332 split_txn: false,
333 },
334 ConnectorProperties::default(),
335 None,
336 )
337 }
338}
339
340#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
341pub enum SourceFormat {
342 #[default]
343 Invalid,
344 Native,
345 None,
346 Debezium,
347 DebeziumMongo,
348 Maxwell,
349 Canal,
350 Upsert,
351 Plain,
352}
353
354#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
356pub enum SourceEncode {
357 #[default]
358 Invalid,
359 Native,
360 None,
361 Avro,
362 Csv,
363 Protobuf,
364 Json,
365 Bytes,
366 Parquet,
367}
368
369#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
370pub struct SourceStruct {
371 pub format: SourceFormat,
372 pub encode: SourceEncode,
373}
374
375impl SourceStruct {
376 pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
377 Self { format, encode }
378 }
379}
380
381pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
383 use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
384
385 if let Ok(format) = info.get_row_format() {
387 let (format, encode) = match format {
388 RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
389 RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
390 RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
391 RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
392 RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
393 RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
394 RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
395 RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
396 RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
397 RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
398 RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
399 RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
400 RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
401 RowFormatType::RowUnspecified => unreachable!(),
402 };
403 return Ok(SourceStruct::new(format, encode));
404 }
405 let source_format = info.get_format()?;
406 let source_encode = info.get_row_encode()?;
407 let (format, encode) = match (source_format, source_encode) {
408 (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
409 (PbFormatType::Plain, PbEncodeType::Protobuf) => {
410 (SourceFormat::Plain, SourceEncode::Protobuf)
411 }
412 (PbFormatType::Debezium, PbEncodeType::Json) => {
413 (SourceFormat::Debezium, SourceEncode::Json)
414 }
415 (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
416 (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
417 (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
418 (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
419 (PbFormatType::Plain, PbEncodeType::Parquet) => {
420 (SourceFormat::Plain, SourceEncode::Parquet)
421 }
422 (PbFormatType::Native, PbEncodeType::Native) => {
423 (SourceFormat::Native, SourceEncode::Native)
424 }
425 (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
426 (PbFormatType::Debezium, PbEncodeType::Avro) => {
427 (SourceFormat::Debezium, SourceEncode::Avro)
428 }
429 (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
430 (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
431 (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
432 (SourceFormat::DebeziumMongo, SourceEncode::Json)
433 }
434 (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
435 (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
436 (SourceFormat::Upsert, SourceEncode::Protobuf)
437 }
438 (format, encode) => {
439 bail!(
440 "Unsupported combination of format {:?} and encode {:?}",
441 format,
442 encode
443 );
444 }
445 };
446 Ok(SourceStruct::new(format, encode))
447}
448
449pub type BoxSourceMessageStream =
451 BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
452pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
454pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
457pub type BoxSourceChunkWithStateStream =
459 BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
460
461pub type BoxStreamingFileSourceChunkStream =
463 BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
464
465pub trait SourceChunkStream:
467 Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
468{
469}
470impl<T> SourceChunkStream for T where
471 T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
472{
473}
474
475pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
476
477#[async_trait]
481pub trait SplitReader: Sized + Send {
482 type Properties;
483 type Split: SplitMetaData;
484
485 async fn new(
486 properties: Self::Properties,
487 state: Vec<Self::Split>,
488 parser_config: ParserConfig,
489 source_ctx: SourceContextRef,
490 columns: Option<Vec<Column>>,
491 ) -> crate::error::ConnectorResult<Self>;
492
493 fn into_stream(self) -> BoxSourceChunkStream;
494
495 fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
496 HashMap::new()
497 }
498
499 async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
500 Err(anyhow!("seek_to_latest is not supported for this connector").into())
501 }
502}
503
504#[derive(Debug, Clone)]
509pub enum BackfillInfo {
510 HasDataToBackfill {
511 latest_offset: String,
519 },
520 NoDataToBackfill,
525}
526
527for_all_sources!(impl_connector_properties);
528
529impl Default for ConnectorProperties {
530 fn default() -> Self {
531 ConnectorProperties::Test(Box::default())
532 }
533}
534
535impl ConnectorProperties {
536 pub fn extract(
543 with_properties: WithOptionsSecResolved,
544 deny_unknown_fields: bool,
545 ) -> Result<Self> {
546 let (options, secret_refs) = with_properties.into_parts();
547 let mut options_with_secret =
548 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
549 let connector = options_with_secret
550 .remove(UPSTREAM_SOURCE_KEY)
551 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
552 .to_lowercase();
553 match_source_name_str!(
554 connector.as_str(),
555 PropType,
556 PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
557 .map(ConnectorProperties::from),
558 |other| bail!("connector '{}' is not supported", other)
559 )
560 }
561
562 pub fn enforce_secret_source(
563 with_properties: &impl WithPropertiesExt,
564 ) -> crate::error::ConnectorResult<()> {
565 let connector = with_properties
566 .get_connector()
567 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
568 .to_lowercase();
569 let key_iter = with_properties.key_iter();
570 match_source_name_str!(
571 connector.as_str(),
572 PropType,
573 PropType::enforce_secret(key_iter),
574 |other| bail!("connector '{}' is not supported", other)
575 )
576 }
577
578 pub fn enable_drop_split(&self) -> bool {
579 matches!(
581 self,
582 ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
583 )
584 }
585
586 pub fn enable_adaptive_splits(&self) -> bool {
588 matches!(self, ConnectorProperties::Nats(_))
589 }
590
591 pub fn init_from_pb_source(&mut self, source: &PbSource) {
593 dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
594 }
595
596 pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
598 dispatch_source_prop!(self, |prop| prop
599 .init_from_pb_cdc_table_desc(cdc_table_desc))
600 }
601
602 pub fn support_multiple_splits(&self) -> bool {
603 matches!(self, ConnectorProperties::Kafka(_))
604 || matches!(self, ConnectorProperties::OpendalS3(_))
605 || matches!(self, ConnectorProperties::Gcs(_))
606 || matches!(self, ConnectorProperties::Azblob(_))
607 }
608
609 pub async fn create_split_enumerator(
610 self,
611 context: crate::source::base::SourceEnumeratorContextRef,
612 ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
613 let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
614 <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
615 ));
616 Ok(enumerator)
617 }
618
619 pub async fn create_split_reader(
620 self,
621 splits: Vec<SplitImpl>,
622 parser_config: ParserConfig,
623 source_ctx: SourceContextRef,
624 columns: Option<Vec<Column>>,
625 mut opt: crate::source::CreateSplitReaderOpt,
626 ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
627 opt.support_multiple_splits = self.support_multiple_splits();
628 tracing::debug!(
629 ?splits,
630 support_multiple_splits = opt.support_multiple_splits,
631 "spawning connector split reader",
632 );
633
634 dispatch_source_prop!(self, |prop| create_split_readers(
635 *prop,
636 splits,
637 parser_config,
638 source_ctx,
639 columns,
640 opt
641 )
642 .await)
643 }
644}
645
646for_all_sources!(impl_split);
647for_all_connections!(impl_connection);
648
649impl From<&SplitImpl> for ConnectorSplit {
650 fn from(split: &SplitImpl) -> Self {
651 dispatch_split_impl!(split, |inner| {
652 ConnectorSplit {
653 split_type: String::from(PropType::SOURCE_NAME),
654 encoded_split: inner.encode_to_bytes().to_vec(),
655 }
656 })
657 }
658}
659
660impl TryFrom<&ConnectorSplit> for SplitImpl {
661 type Error = crate::error::ConnectorError;
662
663 fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
664 let split_type = split.split_type.to_lowercase();
665 match_source_name_str!(
666 split_type.as_str(),
667 PropType,
668 {
669 <PropType as SourceProperties>::Split::restore_from_bytes(
670 split.encoded_split.as_ref(),
671 )
672 .map(Into::into)
673 },
674 |other| bail!("connector '{}' is not supported", other)
675 )
676 }
677}
678
679impl SplitImpl {
680 fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
681 let split_type = split_type.to_lowercase();
682 match_source_name_str!(
683 split_type.as_str(),
684 PropType,
685 <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
686 |other| bail!("connector '{}' is not supported", other)
687 )
688 }
689
690 pub fn is_cdc_split(&self) -> bool {
691 matches!(
692 self,
693 MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
694 )
695 }
696
697 pub fn get_cdc_split_offset(&self) -> String {
699 match self {
700 MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
701 PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
702 MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
703 CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
704 SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
705 _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
706 }
707 }
708
709 pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
710 #[expect(clippy::match_single_binding)]
711 match self {
712 _ => None,
716 }
717 }
718}
719
720impl SplitMetaData for SplitImpl {
721 fn id(&self) -> SplitId {
722 dispatch_split_impl!(self, |inner| inner.id())
723 }
724
725 fn encode_to_json(&self) -> JsonbVal {
726 use serde_json::json;
727 let inner = self.encode_to_json_inner().take();
728 json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
729 }
730
731 fn restore_from_json(value: JsonbVal) -> Result<Self> {
732 let mut value = value.take();
733 let json_obj = value.as_object_mut().unwrap();
734 let split_type = json_obj
735 .remove(SPLIT_TYPE_FIELD)
736 .unwrap()
737 .as_str()
738 .unwrap()
739 .to_owned();
740 let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
741 Self::restore_from_json_inner(&split_type, inner_value.into())
742 }
743
744 fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
745 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
746 }
747}
748
749impl SplitImpl {
750 pub fn get_type(&self) -> String {
751 dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
752 }
753
754 pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
755 dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
756 Ok(())
757 }
758
759 pub fn encode_to_json_inner(&self) -> JsonbVal {
760 dispatch_split_impl!(self, |inner| inner.encode_to_json())
761 }
762}
763
764use risingwave_common::types::DataType;
765
766#[derive(Clone, Debug)]
767pub struct Column {
768 pub name: String,
769 pub data_type: DataType,
770 pub is_visible: bool,
772}
773
774pub type SplitId = Arc<str>;
776
777#[derive(Debug, Clone)]
780pub struct SourceMessage {
781 pub key: Option<Vec<u8>>,
782 pub payload: Option<Vec<u8>>,
783 pub offset: String, pub split_id: SplitId,
785 pub meta: SourceMeta,
786}
787
788impl SourceMessage {
789 pub fn dummy() -> Self {
791 Self {
792 key: None,
793 payload: None,
794 offset: "".to_owned(),
795 split_id: "".into(),
796 meta: SourceMeta::Empty,
797 }
798 }
799
800 pub fn is_cdc_heartbeat(&self) -> bool {
802 self.key.is_none() && self.payload.is_none()
803 }
804}
805
806#[derive(Debug, Clone)]
807pub enum SourceMeta {
808 Kafka(KafkaMeta),
809 Kinesis(KinesisMeta),
810 Pulsar(PulsarMeta),
811 Nexmark(NexmarkMeta),
812 GooglePubsub(GooglePubsubMeta),
813 Datagen(DatagenMeta),
814 DebeziumCdc(DebeziumCdcMeta),
815 Nats(NatsMeta),
816 Empty,
818}
819
820impl PartialEq for SourceMessage {
822 fn eq(&self, other: &Self) -> bool {
823 self.offset == other.offset
824 && self.split_id == other.split_id
825 && self.payload == other.payload
826 }
827}
828impl Eq for SourceMessage {}
829
830pub trait SplitMetaData: Sized {
832 fn id(&self) -> SplitId;
833 fn encode_to_bytes(&self) -> Bytes {
834 self.encode_to_json()
835 .as_scalar_ref()
836 .value_serialize()
837 .into()
838 }
839 fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
840 Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
841 }
842
843 fn encode_to_json(&self) -> JsonbVal;
845 fn restore_from_json(value: JsonbVal) -> Result<Self>;
846 fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
847}
848
849pub type ConnectorState = Option<Vec<SplitImpl>>;
854
855#[cfg(test)]
856mod tests {
857 use maplit::*;
858 use nexmark::event::EventType;
859
860 use super::*;
861 use crate::source::cdc::{DebeziumCdcSplit, Mysql};
862 use crate::source::kafka::KafkaSplit;
863
864 #[test]
865 fn test_split_impl_get_fn() -> Result<()> {
866 let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
867 let split_impl = SplitImpl::Kafka(split.clone());
868 let get_value = split_impl.into_kafka().unwrap();
869 println!("{:?}", get_value);
870 assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
871 assert_eq!(split.encode_to_json(), get_value.encode_to_json());
872
873 Ok(())
874 }
875
876 #[test]
877 fn test_cdc_split_state() -> Result<()> {
878 let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
879 let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
880 let split_impl = SplitImpl::MysqlCdc(split);
881 let encoded_split = split_impl.encode_to_bytes();
882 let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
883 assert_eq!(
884 split_impl.encode_to_bytes(),
885 restored_split_impl.encode_to_bytes()
886 );
887 assert_eq!(
888 split_impl.encode_to_json(),
889 restored_split_impl.encode_to_json()
890 );
891
892 let encoded_split = split_impl.encode_to_json();
893 let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
894 assert_eq!(
895 split_impl.encode_to_bytes(),
896 restored_split_impl.encode_to_bytes()
897 );
898 assert_eq!(
899 split_impl.encode_to_json(),
900 restored_split_impl.encode_to_json()
901 );
902 Ok(())
903 }
904
905 #[test]
906 fn test_extract_nexmark_config() {
907 let props = convert_args!(btreemap!(
908 "connector" => "nexmark",
909 "nexmark.table.type" => "Person",
910 "nexmark.split.num" => "1",
911 ));
912
913 let props =
914 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
915 .unwrap();
916
917 if let ConnectorProperties::Nexmark(props) = props {
918 assert_eq!(props.table_type, Some(EventType::Person));
919 assert_eq!(props.split_num, 1);
920 } else {
921 panic!("extract nexmark config failed");
922 }
923 }
924
925 #[test]
926 fn test_extract_kafka_config() {
927 let props = convert_args!(btreemap!(
928 "connector" => "kafka",
929 "properties.bootstrap.server" => "b1,b2",
930 "topic" => "test",
931 "scan.startup.mode" => "earliest",
932 "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
933 ));
934
935 let props =
936 ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
937 .unwrap();
938 if let ConnectorProperties::Kafka(k) = props {
939 let btreemap = btreemap! {
940 "b-1:9092".to_owned() => "dns-1".to_owned(),
941 "b-2:9092".to_owned() => "dns-2".to_owned(),
942 };
943 assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
944 } else {
945 panic!("extract kafka config failed");
946 }
947 }
948
949 #[test]
950 fn test_extract_cdc_properties() {
951 let user_props_mysql = convert_args!(btreemap!(
952 "connector" => "mysql-cdc",
953 "database.hostname" => "127.0.0.1",
954 "database.port" => "3306",
955 "database.user" => "root",
956 "database.password" => "123456",
957 "database.name" => "mydb",
958 "table.name" => "products",
959 ));
960
961 let user_props_postgres = convert_args!(btreemap!(
962 "connector" => "postgres-cdc",
963 "database.hostname" => "127.0.0.1",
964 "database.port" => "5432",
965 "database.user" => "root",
966 "database.password" => "654321",
967 "schema.name" => "public",
968 "database.name" => "mypgdb",
969 "table.name" => "orders",
970 ));
971
972 let conn_props = ConnectorProperties::extract(
973 WithOptionsSecResolved::without_secrets(user_props_mysql),
974 true,
975 )
976 .unwrap();
977 if let ConnectorProperties::MysqlCdc(c) = conn_props {
978 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
979 assert_eq!(c.properties.get("database.port").unwrap(), "3306");
980 assert_eq!(c.properties.get("database.user").unwrap(), "root");
981 assert_eq!(c.properties.get("database.password").unwrap(), "123456");
982 assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
983 assert_eq!(c.properties.get("table.name").unwrap(), "products");
984 } else {
985 panic!("extract cdc config failed");
986 }
987
988 let conn_props = ConnectorProperties::extract(
989 WithOptionsSecResolved::without_secrets(user_props_postgres),
990 true,
991 )
992 .unwrap();
993 if let ConnectorProperties::PostgresCdc(c) = conn_props {
994 assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
995 assert_eq!(c.properties.get("database.port").unwrap(), "5432");
996 assert_eq!(c.properties.get("database.user").unwrap(), "root");
997 assert_eq!(c.properties.get("database.password").unwrap(), "654321");
998 assert_eq!(c.properties.get("schema.name").unwrap(), "public");
999 assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1000 assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1001 } else {
1002 panic!("extract cdc config failed");
1003 }
1004 }
1005}