risingwave_connector/source/
base.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::catalog::TableId;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
49use crate::enforce_secret::EnforceSecret;
50use crate::error::ConnectorResult as Result;
51use crate::parser::ParserConfig;
52use crate::parser::schema_change::SchemaChangeEnvelope;
53use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57    WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58    for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59    match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68pub trait TryFromBTreeMap: Sized + UnknownFields {
69    /// Used to initialize the source properties from the raw untyped `WITH` options.
70    fn try_from_btreemap(
71        props: BTreeMap<String, String>,
72        deny_unknown_fields: bool,
73    ) -> Result<Self>;
74}
75
76/// Represents `WITH` options for sources.
77///
78/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
79pub trait SourceProperties:
80    TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
81{
82    const SOURCE_NAME: &'static str;
83    type Split: SplitMetaData
84        + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
85        + Into<SplitImpl>;
86    type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
87    type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
88
89    /// Load additional info from `PbSource`. Currently only used by CDC.
90    fn init_from_pb_source(&mut self, _source: &PbSource) {}
91
92    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
93    fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
94}
95
96pub trait UnknownFields {
97    /// Unrecognized fields in the `WITH` clause.
98    fn unknown_fields(&self) -> HashMap<String, String>;
99}
100
101impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
102    fn try_from_btreemap(
103        props: BTreeMap<String, String>,
104        deny_unknown_fields: bool,
105    ) -> Result<Self> {
106        let json_value = serde_json::to_value(props)?;
107        let res = serde_json::from_value::<P>(json_value)?;
108
109        if !deny_unknown_fields || res.unknown_fields().is_empty() {
110            Ok(res)
111        } else {
112            bail!(
113                "Unknown fields in the WITH clause: {:?}",
114                res.unknown_fields()
115            )
116        }
117    }
118}
119
120#[derive(Default)]
121pub struct CreateSplitReaderOpt {
122    pub support_multiple_splits: bool,
123    pub seek_to_latest: bool,
124}
125
126#[derive(Default)]
127pub struct CreateSplitReaderResult {
128    pub latest_splits: Option<Vec<SplitImpl>>,
129    pub backfill_info: HashMap<SplitId, BackfillInfo>,
130}
131
132pub async fn create_split_readers<P: SourceProperties>(
133    prop: P,
134    splits: Vec<SplitImpl>,
135    parser_config: ParserConfig,
136    source_ctx: SourceContextRef,
137    columns: Option<Vec<Column>>,
138    opt: CreateSplitReaderOpt,
139) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
140    let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
141    let mut res = CreateSplitReaderResult {
142        backfill_info: HashMap::new(),
143        latest_splits: None,
144    };
145    if opt.support_multiple_splits {
146        let mut reader = P::SplitReader::new(
147            prop.clone(),
148            splits,
149            parser_config.clone(),
150            source_ctx.clone(),
151            columns.clone(),
152        )
153        .await?;
154        if opt.seek_to_latest {
155            res.latest_splits = Some(reader.seek_to_latest().await?);
156        }
157        res.backfill_info = reader.backfill_info();
158        Ok((reader.into_stream().boxed(), res))
159    } else {
160        let mut readers = try_join_all(splits.into_iter().map(|split| {
161            // TODO: is this reader split across multiple threads...? Realistically, we want
162            // source_ctx to live in a single actor.
163            P::SplitReader::new(
164                prop.clone(),
165                vec![split],
166                parser_config.clone(),
167                source_ctx.clone(),
168                columns.clone(),
169            )
170        }))
171        .await?;
172        if opt.seek_to_latest {
173            let mut latest_splits = vec![];
174            for reader in &mut readers {
175                latest_splits.extend(reader.seek_to_latest().await?);
176            }
177            res.latest_splits = Some(latest_splits);
178        }
179        res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
180        Ok((
181            select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
182            res,
183        ))
184    }
185}
186
187/// [`SplitEnumerator`] fetches the split metadata from the external source service.
188/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
189#[async_trait]
190pub trait SplitEnumerator: Sized + Send {
191    type Split: SplitMetaData + Send;
192    type Properties;
193
194    async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
195    -> Result<Self>;
196    async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
197    /// Do some cleanup work when a fragment is dropped, e.g., drop Kafka consumer group.
198    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
199        Ok(())
200    }
201    /// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
202    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
203        Ok(())
204    }
205}
206
207pub type SourceContextRef = Arc<SourceContext>;
208pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
209
210/// Dyn-compatible [`SplitEnumerator`].
211#[async_trait]
212pub trait AnySplitEnumerator: Send {
213    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
214    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
215    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
216}
217
218#[async_trait]
219impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
220    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
221        SplitEnumerator::list_splits(self)
222            .await
223            .map(|s| s.into_iter().map(|s| s.into()).collect())
224    }
225
226    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
227        SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
228    }
229
230    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
231        SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
232    }
233}
234
235/// The max size of a chunk yielded by source stream.
236pub const MAX_CHUNK_SIZE: usize = 1024;
237
238#[derive(Debug, Clone, Copy)]
239pub struct SourceCtrlOpts {
240    /// The max size of a chunk yielded by source stream.
241    pub chunk_size: usize,
242    /// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`.
243    pub split_txn: bool,
244}
245
246// The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it,
247// so that we can prevent any unintentional use of the default value.
248impl !Default for SourceCtrlOpts {}
249
250impl SourceCtrlOpts {
251    #[cfg(test)]
252    pub fn for_test() -> Self {
253        SourceCtrlOpts {
254            chunk_size: 256,
255            split_txn: false,
256        }
257    }
258}
259
260#[derive(Debug)]
261pub struct SourceEnumeratorContext {
262    pub info: SourceEnumeratorInfo,
263    pub metrics: Arc<EnumeratorMetrics>,
264}
265
266impl SourceEnumeratorContext {
267    /// Create a dummy `SourceEnumeratorContext` for testing purpose, or for the situation
268    /// where the real context doesn't matter.
269    pub fn dummy() -> SourceEnumeratorContext {
270        SourceEnumeratorContext {
271            info: SourceEnumeratorInfo { source_id: 0 },
272            metrics: Arc::new(EnumeratorMetrics::default()),
273        }
274    }
275}
276
277#[derive(Clone, Debug)]
278pub struct SourceEnumeratorInfo {
279    pub source_id: u32,
280}
281
282#[derive(Debug, Clone)]
283pub struct SourceContext {
284    pub actor_id: u32,
285    pub source_id: TableId,
286    pub fragment_id: u32,
287    pub source_name: String,
288    pub metrics: Arc<SourceMetrics>,
289    pub source_ctrl_opts: SourceCtrlOpts,
290    pub connector_props: ConnectorProperties,
291    // source parser put schema change event into this channel
292    pub schema_change_tx:
293        Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
294}
295
296impl SourceContext {
297    pub fn new(
298        actor_id: u32,
299        source_id: TableId,
300        fragment_id: u32,
301        source_name: String,
302        metrics: Arc<SourceMetrics>,
303        source_ctrl_opts: SourceCtrlOpts,
304        connector_props: ConnectorProperties,
305        schema_change_channel: Option<
306            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
307        >,
308    ) -> Self {
309        Self {
310            actor_id,
311            source_id,
312            fragment_id,
313            source_name,
314            metrics,
315            source_ctrl_opts,
316            connector_props,
317            schema_change_tx: schema_change_channel,
318        }
319    }
320
321    /// Create a dummy `SourceContext` for testing purpose, or for the situation
322    /// where the real context doesn't matter.
323    pub fn dummy() -> Self {
324        Self::new(
325            0,
326            TableId::new(0),
327            0,
328            "dummy".to_owned(),
329            Arc::new(SourceMetrics::default()),
330            SourceCtrlOpts {
331                chunk_size: MAX_CHUNK_SIZE,
332                split_txn: false,
333            },
334            ConnectorProperties::default(),
335            None,
336        )
337    }
338}
339
340#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
341pub enum SourceFormat {
342    #[default]
343    Invalid,
344    Native,
345    None,
346    Debezium,
347    DebeziumMongo,
348    Maxwell,
349    Canal,
350    Upsert,
351    Plain,
352}
353
354/// Refer to [`crate::parser::EncodingProperties`]
355#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
356pub enum SourceEncode {
357    #[default]
358    Invalid,
359    Native,
360    None,
361    Avro,
362    Csv,
363    Protobuf,
364    Json,
365    Bytes,
366    Parquet,
367}
368
369#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
370pub struct SourceStruct {
371    pub format: SourceFormat,
372    pub encode: SourceEncode,
373}
374
375impl SourceStruct {
376    pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
377        Self { format, encode }
378    }
379}
380
381// Only return valid (format, encode)
382pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
383    use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
384
385    // old version meta.
386    if let Ok(format) = info.get_row_format() {
387        let (format, encode) = match format {
388            RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
389            RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
390            RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
391            RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
392            RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
393            RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
394            RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
395            RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
396            RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
397            RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
398            RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
399            RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
400            RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
401            RowFormatType::RowUnspecified => unreachable!(),
402        };
403        return Ok(SourceStruct::new(format, encode));
404    }
405    let source_format = info.get_format()?;
406    let source_encode = info.get_row_encode()?;
407    let (format, encode) = match (source_format, source_encode) {
408        (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
409        (PbFormatType::Plain, PbEncodeType::Protobuf) => {
410            (SourceFormat::Plain, SourceEncode::Protobuf)
411        }
412        (PbFormatType::Debezium, PbEncodeType::Json) => {
413            (SourceFormat::Debezium, SourceEncode::Json)
414        }
415        (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
416        (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
417        (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
418        (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
419        (PbFormatType::Plain, PbEncodeType::Parquet) => {
420            (SourceFormat::Plain, SourceEncode::Parquet)
421        }
422        (PbFormatType::Native, PbEncodeType::Native) => {
423            (SourceFormat::Native, SourceEncode::Native)
424        }
425        (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
426        (PbFormatType::Debezium, PbEncodeType::Avro) => {
427            (SourceFormat::Debezium, SourceEncode::Avro)
428        }
429        (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
430        (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
431        (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
432            (SourceFormat::DebeziumMongo, SourceEncode::Json)
433        }
434        (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
435        (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
436            (SourceFormat::Upsert, SourceEncode::Protobuf)
437        }
438        (format, encode) => {
439            bail!(
440                "Unsupported combination of format {:?} and encode {:?}",
441                format,
442                encode
443            );
444        }
445    };
446    Ok(SourceStruct::new(format, encode))
447}
448
449/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch.
450pub type BoxSourceMessageStream =
451    BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
452/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
453pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
454pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
455pub type BoxSourceChunkWithStateStream =
456    BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
457
458/// Stream of [`Option<StreamChunk>`]s parsed from the messages from the external source.
459pub type BoxStreamingFileSourceChunkStream =
460    BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
461
462// Manually expand the trait alias to improve IDE experience.
463pub trait SourceChunkStream:
464    Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
465{
466}
467impl<T> SourceChunkStream for T where
468    T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
469{
470}
471
472pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
473
474/// [`SplitReader`] is a new abstraction of the external connector read interface which is
475/// responsible for parsing, it is used to read messages from the outside and transform them into a
476/// stream of parsed [`StreamChunk`]
477#[async_trait]
478pub trait SplitReader: Sized + Send {
479    type Properties;
480    type Split: SplitMetaData;
481
482    async fn new(
483        properties: Self::Properties,
484        state: Vec<Self::Split>,
485        parser_config: ParserConfig,
486        source_ctx: SourceContextRef,
487        columns: Option<Vec<Column>>,
488    ) -> crate::error::ConnectorResult<Self>;
489
490    fn into_stream(self) -> BoxSourceChunkStream;
491
492    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
493        HashMap::new()
494    }
495
496    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
497        Err(anyhow!("seek_to_latest is not supported for this connector").into())
498    }
499}
500
501/// Information used to determine whether we should start and finish source backfill.
502///
503/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
504/// perhaps we should ban blocking DDL for it.
505#[derive(Debug, Clone)]
506pub enum BackfillInfo {
507    HasDataToBackfill {
508        /// The last available offsets for each split (**inclusive**).
509        ///
510        /// This will be used to determine whether source backfill is finished when
511        /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
512        /// blocking DDL cannot finish until new messages come.
513        ///
514        /// When there are upstream messages, we will use the latest offsets from the upstream.
515        latest_offset: String,
516    },
517    /// If there are no messages in the split at all, we don't need to start backfill.
518    /// In this case, there will be no message from the backfill stream too.
519    /// If we started backfill, we cannot finish it until new messages come.
520    /// So we mark this a special case for optimization.
521    NoDataToBackfill,
522}
523
524for_all_sources!(impl_connector_properties);
525
526impl Default for ConnectorProperties {
527    fn default() -> Self {
528        ConnectorProperties::Test(Box::default())
529    }
530}
531
532impl ConnectorProperties {
533    pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
534        with_properties
535            .get(UPSTREAM_SOURCE_KEY)
536            .map(|s| {
537                s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
538                    || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
539                    || s.eq_ignore_ascii_case(GCS_CONNECTOR)
540                    || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
541            })
542            .unwrap_or(false)
543    }
544}
545
546impl ConnectorProperties {
547    /// Creates typed source properties from the raw `WITH` properties.
548    ///
549    /// It checks the `connector` field, and them dispatches to the corresponding type's `try_from_btreemap` method.
550    ///
551    /// `deny_unknown_fields`: Since `WITH` options are persisted in meta, we do not deny unknown fields when restoring from
552    /// existing data to avoid breaking backwards compatibility. We only deny unknown fields when creating new sources.
553    pub fn extract(
554        with_properties: WithOptionsSecResolved,
555        deny_unknown_fields: bool,
556    ) -> Result<Self> {
557        let (options, secret_refs) = with_properties.into_parts();
558        let mut options_with_secret =
559            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
560        let connector = options_with_secret
561            .remove(UPSTREAM_SOURCE_KEY)
562            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
563            .to_lowercase();
564        match_source_name_str!(
565            connector.as_str(),
566            PropType,
567            PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
568                .map(ConnectorProperties::from),
569            |other| bail!("connector '{}' is not supported", other)
570        )
571    }
572
573    pub fn enforce_secret_source(
574        with_properties: &impl WithPropertiesExt,
575    ) -> crate::error::ConnectorResult<()> {
576        let connector = with_properties
577            .get_connector()
578            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
579            .to_lowercase();
580        let key_iter = with_properties.key_iter();
581        match_source_name_str!(
582            connector.as_str(),
583            PropType,
584            PropType::enforce_secret(key_iter),
585            |other| bail!("connector '{}' is not supported", other)
586        )
587    }
588
589    pub fn enable_drop_split(&self) -> bool {
590        // enable split scale in just for Kinesis
591        matches!(
592            self,
593            ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
594        )
595    }
596
597    /// For most connectors, this should be false. When enabled, RisingWave should not track any progress.
598    pub fn enable_adaptive_splits(&self) -> bool {
599        matches!(self, ConnectorProperties::Nats(_))
600    }
601
602    /// Load additional info from `PbSource`. Currently only used by CDC.
603    pub fn init_from_pb_source(&mut self, source: &PbSource) {
604        dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
605    }
606
607    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
608    pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
609        dispatch_source_prop!(self, |prop| prop
610            .init_from_pb_cdc_table_desc(cdc_table_desc))
611    }
612
613    pub fn support_multiple_splits(&self) -> bool {
614        matches!(self, ConnectorProperties::Kafka(_))
615            || matches!(self, ConnectorProperties::OpendalS3(_))
616            || matches!(self, ConnectorProperties::Gcs(_))
617            || matches!(self, ConnectorProperties::Azblob(_))
618    }
619
620    pub async fn create_split_enumerator(
621        self,
622        context: crate::source::base::SourceEnumeratorContextRef,
623    ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
624        let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
625            <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
626        ));
627        Ok(enumerator)
628    }
629
630    pub async fn create_split_reader(
631        self,
632        splits: Vec<SplitImpl>,
633        parser_config: ParserConfig,
634        source_ctx: SourceContextRef,
635        columns: Option<Vec<Column>>,
636        mut opt: crate::source::CreateSplitReaderOpt,
637    ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
638        opt.support_multiple_splits = self.support_multiple_splits();
639        tracing::debug!(
640            ?splits,
641            support_multiple_splits = opt.support_multiple_splits,
642            "spawning connector split reader",
643        );
644
645        dispatch_source_prop!(self, |prop| create_split_readers(
646            *prop,
647            splits,
648            parser_config,
649            source_ctx,
650            columns,
651            opt
652        )
653        .await)
654    }
655}
656
657for_all_sources!(impl_split);
658for_all_connections!(impl_connection);
659
660impl From<&SplitImpl> for ConnectorSplit {
661    fn from(split: &SplitImpl) -> Self {
662        dispatch_split_impl!(split, |inner| {
663            ConnectorSplit {
664                split_type: String::from(PropType::SOURCE_NAME),
665                encoded_split: inner.encode_to_bytes().to_vec(),
666            }
667        })
668    }
669}
670
671impl TryFrom<&ConnectorSplit> for SplitImpl {
672    type Error = crate::error::ConnectorError;
673
674    fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
675        let split_type = split.split_type.to_lowercase();
676        match_source_name_str!(
677            split_type.as_str(),
678            PropType,
679            {
680                <PropType as SourceProperties>::Split::restore_from_bytes(
681                    split.encoded_split.as_ref(),
682                )
683                .map(Into::into)
684            },
685            |other| bail!("connector '{}' is not supported", other)
686        )
687    }
688}
689
690impl SplitImpl {
691    fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
692        let split_type = split_type.to_lowercase();
693        match_source_name_str!(
694            split_type.as_str(),
695            PropType,
696            <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
697            |other| bail!("connector '{}' is not supported", other)
698        )
699    }
700
701    pub fn is_cdc_split(&self) -> bool {
702        matches!(
703            self,
704            MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
705        )
706    }
707
708    /// Get the current split offset.
709    pub fn get_cdc_split_offset(&self) -> String {
710        match self {
711            MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
712            PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
713            MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
714            CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
715            SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
716            _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
717        }
718    }
719}
720
721impl SplitMetaData for SplitImpl {
722    fn id(&self) -> SplitId {
723        dispatch_split_impl!(self, |inner| inner.id())
724    }
725
726    fn encode_to_json(&self) -> JsonbVal {
727        use serde_json::json;
728        let inner = self.encode_to_json_inner().take();
729        json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
730    }
731
732    fn restore_from_json(value: JsonbVal) -> Result<Self> {
733        let mut value = value.take();
734        let json_obj = value.as_object_mut().unwrap();
735        let split_type = json_obj
736            .remove(SPLIT_TYPE_FIELD)
737            .unwrap()
738            .as_str()
739            .unwrap()
740            .to_owned();
741        let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
742        Self::restore_from_json_inner(&split_type, inner_value.into())
743    }
744
745    fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
746        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
747    }
748}
749
750impl SplitImpl {
751    pub fn get_type(&self) -> String {
752        dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
753    }
754
755    pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
756        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
757        Ok(())
758    }
759
760    pub fn encode_to_json_inner(&self) -> JsonbVal {
761        dispatch_split_impl!(self, |inner| inner.encode_to_json())
762    }
763}
764
765use risingwave_common::types::DataType;
766
767#[derive(Clone, Debug)]
768pub struct Column {
769    pub name: String,
770    pub data_type: DataType,
771    /// This field is only used by datagen.
772    pub is_visible: bool,
773}
774
775/// Split id resides in every source message, use `Arc` to avoid copying.
776pub type SplitId = Arc<str>;
777
778/// The message pumped from the external source service.
779/// The third-party message structs will eventually be transformed into this struct.
780#[derive(Debug, Clone)]
781pub struct SourceMessage {
782    pub key: Option<Vec<u8>>,
783    pub payload: Option<Vec<u8>>,
784    pub offset: String, // TODO: use `Arc<str>`
785    pub split_id: SplitId,
786    pub meta: SourceMeta,
787}
788
789impl SourceMessage {
790    /// Create a dummy `SourceMessage` with all fields unset for testing purposes.
791    pub fn dummy() -> Self {
792        Self {
793            key: None,
794            payload: None,
795            offset: "".to_owned(),
796            split_id: "".into(),
797            meta: SourceMeta::Empty,
798        }
799    }
800
801    /// Check whether the source message is a CDC heartbeat message.
802    pub fn is_cdc_heartbeat(&self) -> bool {
803        self.key.is_none() && self.payload.is_none()
804    }
805}
806
807#[derive(Debug, Clone)]
808pub enum SourceMeta {
809    Kafka(KafkaMeta),
810    Kinesis(KinesisMeta),
811    Pulsar(PulsarMeta),
812    Nexmark(NexmarkMeta),
813    GooglePubsub(GooglePubsubMeta),
814    Datagen(DatagenMeta),
815    DebeziumCdc(DebeziumCdcMeta),
816    Nats(NatsMeta),
817    // For the source that doesn't have meta data.
818    Empty,
819}
820
821/// Implement Eq manually to ignore the `meta` field.
822impl PartialEq for SourceMessage {
823    fn eq(&self, other: &Self) -> bool {
824        self.offset == other.offset
825            && self.split_id == other.split_id
826            && self.payload == other.payload
827    }
828}
829impl Eq for SourceMessage {}
830
831/// The metadata of a split.
832pub trait SplitMetaData: Sized {
833    fn id(&self) -> SplitId;
834    fn encode_to_bytes(&self) -> Bytes {
835        self.encode_to_json()
836            .as_scalar_ref()
837            .value_serialize()
838            .into()
839    }
840    fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
841        Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
842    }
843
844    /// Encode the whole split metadata to a JSON object
845    fn encode_to_json(&self) -> JsonbVal;
846    fn restore_from_json(value: JsonbVal) -> Result<Self>;
847    fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
848}
849
850/// [`ConnectorState`] maintains the consuming splits' info. In specific split readers,
851/// `ConnectorState` cannot be [`None`] and contains one(for mq split readers) or many(for fs
852/// split readers) [`SplitImpl`]. If no split is assigned to source executor, `ConnectorState` is
853/// [`None`] and the created source stream will be a pending stream.
854pub type ConnectorState = Option<Vec<SplitImpl>>;
855
856#[cfg(test)]
857mod tests {
858    use maplit::*;
859    use nexmark::event::EventType;
860
861    use super::*;
862    use crate::source::cdc::{DebeziumCdcSplit, Mysql};
863    use crate::source::kafka::KafkaSplit;
864
865    #[test]
866    fn test_split_impl_get_fn() -> Result<()> {
867        let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
868        let split_impl = SplitImpl::Kafka(split.clone());
869        let get_value = split_impl.into_kafka().unwrap();
870        println!("{:?}", get_value);
871        assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
872        assert_eq!(split.encode_to_json(), get_value.encode_to_json());
873
874        Ok(())
875    }
876
877    #[test]
878    fn test_cdc_split_state() -> Result<()> {
879        let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
880        let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
881        let split_impl = SplitImpl::MysqlCdc(split);
882        let encoded_split = split_impl.encode_to_bytes();
883        let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
884        assert_eq!(
885            split_impl.encode_to_bytes(),
886            restored_split_impl.encode_to_bytes()
887        );
888        assert_eq!(
889            split_impl.encode_to_json(),
890            restored_split_impl.encode_to_json()
891        );
892
893        let encoded_split = split_impl.encode_to_json();
894        let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
895        assert_eq!(
896            split_impl.encode_to_bytes(),
897            restored_split_impl.encode_to_bytes()
898        );
899        assert_eq!(
900            split_impl.encode_to_json(),
901            restored_split_impl.encode_to_json()
902        );
903        Ok(())
904    }
905
906    #[test]
907    fn test_extract_nexmark_config() {
908        let props = convert_args!(btreemap!(
909            "connector" => "nexmark",
910            "nexmark.table.type" => "Person",
911            "nexmark.split.num" => "1",
912        ));
913
914        let props =
915            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
916                .unwrap();
917
918        if let ConnectorProperties::Nexmark(props) = props {
919            assert_eq!(props.table_type, Some(EventType::Person));
920            assert_eq!(props.split_num, 1);
921        } else {
922            panic!("extract nexmark config failed");
923        }
924    }
925
926    #[test]
927    fn test_extract_kafka_config() {
928        let props = convert_args!(btreemap!(
929            "connector" => "kafka",
930            "properties.bootstrap.server" => "b1,b2",
931            "topic" => "test",
932            "scan.startup.mode" => "earliest",
933            "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
934        ));
935
936        let props =
937            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
938                .unwrap();
939        if let ConnectorProperties::Kafka(k) = props {
940            let btreemap = btreemap! {
941                "b-1:9092".to_owned() => "dns-1".to_owned(),
942                "b-2:9092".to_owned() => "dns-2".to_owned(),
943            };
944            assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
945        } else {
946            panic!("extract kafka config failed");
947        }
948    }
949
950    #[test]
951    fn test_extract_cdc_properties() {
952        let user_props_mysql = convert_args!(btreemap!(
953            "connector" => "mysql-cdc",
954            "database.hostname" => "127.0.0.1",
955            "database.port" => "3306",
956            "database.user" => "root",
957            "database.password" => "123456",
958            "database.name" => "mydb",
959            "table.name" => "products",
960        ));
961
962        let user_props_postgres = convert_args!(btreemap!(
963            "connector" => "postgres-cdc",
964            "database.hostname" => "127.0.0.1",
965            "database.port" => "5432",
966            "database.user" => "root",
967            "database.password" => "654321",
968            "schema.name" => "public",
969            "database.name" => "mypgdb",
970            "table.name" => "orders",
971        ));
972
973        let conn_props = ConnectorProperties::extract(
974            WithOptionsSecResolved::without_secrets(user_props_mysql),
975            true,
976        )
977        .unwrap();
978        if let ConnectorProperties::MysqlCdc(c) = conn_props {
979            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
980            assert_eq!(c.properties.get("database.port").unwrap(), "3306");
981            assert_eq!(c.properties.get("database.user").unwrap(), "root");
982            assert_eq!(c.properties.get("database.password").unwrap(), "123456");
983            assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
984            assert_eq!(c.properties.get("table.name").unwrap(), "products");
985        } else {
986            panic!("extract cdc config failed");
987        }
988
989        let conn_props = ConnectorProperties::extract(
990            WithOptionsSecResolved::without_secrets(user_props_postgres),
991            true,
992        )
993        .unwrap();
994        if let ConnectorProperties::PostgresCdc(c) = conn_props {
995            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
996            assert_eq!(c.properties.get("database.port").unwrap(), "5432");
997            assert_eq!(c.properties.get("database.user").unwrap(), "root");
998            assert_eq!(c.properties.get("database.password").unwrap(), "654321");
999            assert_eq!(c.properties.get("schema.name").unwrap(), "public");
1000            assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1001            assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1002        } else {
1003            panic!("extract cdc config failed");
1004        }
1005    }
1006}