risingwave_connector/source/
base.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::catalog::TableId;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use crate::enforce_secret::EnforceSecret;
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::batch::BatchSourceSplitImpl;
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57    WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58    for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59    match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68pub trait TryFromBTreeMap: Sized + UnknownFields {
69    /// Used to initialize the source properties from the raw untyped `WITH` options.
70    fn try_from_btreemap(
71        props: BTreeMap<String, String>,
72        deny_unknown_fields: bool,
73    ) -> Result<Self>;
74}
75
76/// Represents `WITH` options for sources.
77///
78/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
79pub trait SourceProperties:
80    TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
81{
82    const SOURCE_NAME: &'static str;
83    type Split: SplitMetaData
84        + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
85        + Into<SplitImpl>;
86    type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
87    type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
88
89    /// Load additional info from `PbSource`. Currently only used by CDC.
90    fn init_from_pb_source(&mut self, _source: &PbSource) {}
91
92    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
93    fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
94}
95
96pub trait UnknownFields {
97    /// Unrecognized fields in the `WITH` clause.
98    fn unknown_fields(&self) -> HashMap<String, String>;
99}
100
101impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
102    fn try_from_btreemap(
103        props: BTreeMap<String, String>,
104        deny_unknown_fields: bool,
105    ) -> Result<Self> {
106        let json_value = serde_json::to_value(props)?;
107        let res = serde_json::from_value::<P>(json_value)?;
108
109        if !deny_unknown_fields || res.unknown_fields().is_empty() {
110            Ok(res)
111        } else {
112            bail!(
113                "Unknown fields in the WITH clause: {:?}",
114                res.unknown_fields()
115            )
116        }
117    }
118}
119
120#[derive(Default)]
121pub struct CreateSplitReaderOpt {
122    pub support_multiple_splits: bool,
123    pub seek_to_latest: bool,
124}
125
126#[derive(Default)]
127pub struct CreateSplitReaderResult {
128    pub latest_splits: Option<Vec<SplitImpl>>,
129    pub backfill_info: HashMap<SplitId, BackfillInfo>,
130}
131
132pub async fn create_split_readers<P: SourceProperties>(
133    prop: P,
134    splits: Vec<SplitImpl>,
135    parser_config: ParserConfig,
136    source_ctx: SourceContextRef,
137    columns: Option<Vec<Column>>,
138    opt: CreateSplitReaderOpt,
139) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
140    let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
141    let mut res = CreateSplitReaderResult {
142        backfill_info: HashMap::new(),
143        latest_splits: None,
144    };
145    if opt.support_multiple_splits {
146        let mut reader = P::SplitReader::new(
147            prop.clone(),
148            splits,
149            parser_config.clone(),
150            source_ctx.clone(),
151            columns.clone(),
152        )
153        .await?;
154        if opt.seek_to_latest {
155            res.latest_splits = Some(reader.seek_to_latest().await?);
156        }
157        res.backfill_info = reader.backfill_info();
158        Ok((reader.into_stream().boxed(), res))
159    } else {
160        let mut readers = try_join_all(splits.into_iter().map(|split| {
161            // TODO: is this reader split across multiple threads...? Realistically, we want
162            // source_ctx to live in a single actor.
163            P::SplitReader::new(
164                prop.clone(),
165                vec![split],
166                parser_config.clone(),
167                source_ctx.clone(),
168                columns.clone(),
169            )
170        }))
171        .await?;
172        if opt.seek_to_latest {
173            let mut latest_splits = vec![];
174            for reader in &mut readers {
175                latest_splits.extend(reader.seek_to_latest().await?);
176            }
177            res.latest_splits = Some(latest_splits);
178        }
179        res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
180        Ok((
181            select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
182            res,
183        ))
184    }
185}
186
187/// [`SplitEnumerator`] fetches the split metadata from the external source service.
188/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
189#[async_trait]
190pub trait SplitEnumerator: Sized + Send {
191    type Split: SplitMetaData + Send;
192    type Properties;
193
194    async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
195    -> Result<Self>;
196    async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
197    /// Do some cleanup work when a fragment is dropped, e.g., drop Kafka consumer group.
198    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
199        Ok(())
200    }
201    /// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
202    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
203        Ok(())
204    }
205}
206
207pub type SourceContextRef = Arc<SourceContext>;
208pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
209
210/// Dyn-compatible [`SplitEnumerator`].
211#[async_trait]
212pub trait AnySplitEnumerator: Send {
213    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
214    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
215    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
216}
217
218#[async_trait]
219impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
220    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
221        SplitEnumerator::list_splits(self)
222            .await
223            .map(|s| s.into_iter().map(|s| s.into()).collect())
224    }
225
226    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
227        SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
228    }
229
230    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
231        SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
232    }
233}
234
235/// The max size of a chunk yielded by source stream.
236pub const MAX_CHUNK_SIZE: usize = 1024;
237
238#[derive(Debug, Clone, Copy)]
239pub struct SourceCtrlOpts {
240    /// The max size of a chunk yielded by source stream.
241    pub chunk_size: usize,
242    /// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`.
243    pub split_txn: bool,
244}
245
246// The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it,
247// so that we can prevent any unintentional use of the default value.
248impl !Default for SourceCtrlOpts {}
249
250impl SourceCtrlOpts {
251    #[cfg(test)]
252    pub fn for_test() -> Self {
253        SourceCtrlOpts {
254            chunk_size: 256,
255            split_txn: false,
256        }
257    }
258}
259
260#[derive(Debug)]
261pub struct SourceEnumeratorContext {
262    pub info: SourceEnumeratorInfo,
263    pub metrics: Arc<EnumeratorMetrics>,
264}
265
266impl SourceEnumeratorContext {
267    /// Create a dummy `SourceEnumeratorContext` for testing purpose, or for the situation
268    /// where the real context doesn't matter.
269    pub fn dummy() -> SourceEnumeratorContext {
270        SourceEnumeratorContext {
271            info: SourceEnumeratorInfo { source_id: 0 },
272            metrics: Arc::new(EnumeratorMetrics::default()),
273        }
274    }
275}
276
277#[derive(Clone, Debug)]
278pub struct SourceEnumeratorInfo {
279    pub source_id: u32,
280}
281
282#[derive(Debug, Clone)]
283pub struct SourceContext {
284    pub actor_id: u32,
285    pub source_id: TableId,
286    pub fragment_id: u32,
287    pub source_name: String,
288    pub metrics: Arc<SourceMetrics>,
289    pub source_ctrl_opts: SourceCtrlOpts,
290    pub connector_props: ConnectorProperties,
291    // source parser put schema change event into this channel
292    pub schema_change_tx:
293        Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
294}
295
296impl SourceContext {
297    pub fn new(
298        actor_id: u32,
299        source_id: TableId,
300        fragment_id: u32,
301        source_name: String,
302        metrics: Arc<SourceMetrics>,
303        source_ctrl_opts: SourceCtrlOpts,
304        connector_props: ConnectorProperties,
305        schema_change_channel: Option<
306            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
307        >,
308    ) -> Self {
309        Self {
310            actor_id,
311            source_id,
312            fragment_id,
313            source_name,
314            metrics,
315            source_ctrl_opts,
316            connector_props,
317            schema_change_tx: schema_change_channel,
318        }
319    }
320
321    /// Create a dummy `SourceContext` for testing purpose, or for the situation
322    /// where the real context doesn't matter.
323    pub fn dummy() -> Self {
324        Self::new(
325            0,
326            TableId::new(0),
327            0,
328            "dummy".to_owned(),
329            Arc::new(SourceMetrics::default()),
330            SourceCtrlOpts {
331                chunk_size: MAX_CHUNK_SIZE,
332                split_txn: false,
333            },
334            ConnectorProperties::default(),
335            None,
336        )
337    }
338}
339
340#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
341pub enum SourceFormat {
342    #[default]
343    Invalid,
344    Native,
345    None,
346    Debezium,
347    DebeziumMongo,
348    Maxwell,
349    Canal,
350    Upsert,
351    Plain,
352}
353
354/// Refer to [`crate::parser::EncodingProperties`]
355#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
356pub enum SourceEncode {
357    #[default]
358    Invalid,
359    Native,
360    None,
361    Avro,
362    Csv,
363    Protobuf,
364    Json,
365    Bytes,
366    Parquet,
367}
368
369#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
370pub struct SourceStruct {
371    pub format: SourceFormat,
372    pub encode: SourceEncode,
373}
374
375impl SourceStruct {
376    pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
377        Self { format, encode }
378    }
379}
380
381// Only return valid (format, encode)
382pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
383    use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
384
385    // old version meta.
386    if let Ok(format) = info.get_row_format() {
387        let (format, encode) = match format {
388            RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
389            RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
390            RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
391            RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
392            RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
393            RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
394            RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
395            RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
396            RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
397            RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
398            RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
399            RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
400            RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
401            RowFormatType::RowUnspecified => unreachable!(),
402        };
403        return Ok(SourceStruct::new(format, encode));
404    }
405    let source_format = info.get_format()?;
406    let source_encode = info.get_row_encode()?;
407    let (format, encode) = match (source_format, source_encode) {
408        (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
409        (PbFormatType::Plain, PbEncodeType::Protobuf) => {
410            (SourceFormat::Plain, SourceEncode::Protobuf)
411        }
412        (PbFormatType::Debezium, PbEncodeType::Json) => {
413            (SourceFormat::Debezium, SourceEncode::Json)
414        }
415        (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
416        (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
417        (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
418        (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
419        (PbFormatType::Plain, PbEncodeType::Parquet) => {
420            (SourceFormat::Plain, SourceEncode::Parquet)
421        }
422        (PbFormatType::Native, PbEncodeType::Native) => {
423            (SourceFormat::Native, SourceEncode::Native)
424        }
425        (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
426        (PbFormatType::Debezium, PbEncodeType::Avro) => {
427            (SourceFormat::Debezium, SourceEncode::Avro)
428        }
429        (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
430        (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
431        (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
432            (SourceFormat::DebeziumMongo, SourceEncode::Json)
433        }
434        (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
435        (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
436            (SourceFormat::Upsert, SourceEncode::Protobuf)
437        }
438        (format, encode) => {
439            bail!(
440                "Unsupported combination of format {:?} and encode {:?}",
441                format,
442                encode
443            );
444        }
445    };
446    Ok(SourceStruct::new(format, encode))
447}
448
449/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch.
450pub type BoxSourceMessageStream =
451    BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
452/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
453pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
454/// `StreamChunk` with the latest split state.
455/// The state is constructed in `StreamReaderBuilder::into_retry_stream`
456pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
457/// See [`StreamChunkWithState`].
458pub type BoxSourceChunkWithStateStream =
459    BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
460
461/// Stream of [`Option<StreamChunk>`]s parsed from the messages from the external source.
462pub type BoxStreamingFileSourceChunkStream =
463    BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
464
465// Manually expand the trait alias to improve IDE experience.
466pub trait SourceChunkStream:
467    Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
468{
469}
470impl<T> SourceChunkStream for T where
471    T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
472{
473}
474
475pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
476
477/// [`SplitReader`] is a new abstraction of the external connector read interface which is
478/// responsible for parsing, it is used to read messages from the outside and transform them into a
479/// stream of parsed [`StreamChunk`]
480#[async_trait]
481pub trait SplitReader: Sized + Send {
482    type Properties;
483    type Split: SplitMetaData;
484
485    async fn new(
486        properties: Self::Properties,
487        state: Vec<Self::Split>,
488        parser_config: ParserConfig,
489        source_ctx: SourceContextRef,
490        columns: Option<Vec<Column>>,
491    ) -> crate::error::ConnectorResult<Self>;
492
493    fn into_stream(self) -> BoxSourceChunkStream;
494
495    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
496        HashMap::new()
497    }
498
499    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
500        Err(anyhow!("seek_to_latest is not supported for this connector").into())
501    }
502}
503
504/// Information used to determine whether we should start and finish source backfill.
505///
506/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
507/// perhaps we should ban blocking DDL for it.
508#[derive(Debug, Clone)]
509pub enum BackfillInfo {
510    HasDataToBackfill {
511        /// The last available offsets for each split (**inclusive**).
512        ///
513        /// This will be used to determine whether source backfill is finished when
514        /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
515        /// blocking DDL cannot finish until new messages come.
516        ///
517        /// When there are upstream messages, we will use the latest offsets from the upstream.
518        latest_offset: String,
519    },
520    /// If there are no messages in the split at all, we don't need to start backfill.
521    /// In this case, there will be no message from the backfill stream too.
522    /// If we started backfill, we cannot finish it until new messages come.
523    /// So we mark this a special case for optimization.
524    NoDataToBackfill,
525}
526
527for_all_sources!(impl_connector_properties);
528
529impl Default for ConnectorProperties {
530    fn default() -> Self {
531        ConnectorProperties::Test(Box::default())
532    }
533}
534
535impl ConnectorProperties {
536    /// Creates typed source properties from the raw `WITH` properties.
537    ///
538    /// It checks the `connector` field, and them dispatches to the corresponding type's `try_from_btreemap` method.
539    ///
540    /// `deny_unknown_fields`: Since `WITH` options are persisted in meta, we do not deny unknown fields when restoring from
541    /// existing data to avoid breaking backwards compatibility. We only deny unknown fields when creating new sources.
542    pub fn extract(
543        with_properties: WithOptionsSecResolved,
544        deny_unknown_fields: bool,
545    ) -> Result<Self> {
546        let (options, secret_refs) = with_properties.into_parts();
547        let mut options_with_secret =
548            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
549        let connector = options_with_secret
550            .remove(UPSTREAM_SOURCE_KEY)
551            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
552            .to_lowercase();
553        match_source_name_str!(
554            connector.as_str(),
555            PropType,
556            PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
557                .map(ConnectorProperties::from),
558            |other| bail!("connector '{}' is not supported", other)
559        )
560    }
561
562    pub fn enforce_secret_source(
563        with_properties: &impl WithPropertiesExt,
564    ) -> crate::error::ConnectorResult<()> {
565        let connector = with_properties
566            .get_connector()
567            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
568            .to_lowercase();
569        let key_iter = with_properties.key_iter();
570        match_source_name_str!(
571            connector.as_str(),
572            PropType,
573            PropType::enforce_secret(key_iter),
574            |other| bail!("connector '{}' is not supported", other)
575        )
576    }
577
578    pub fn enable_drop_split(&self) -> bool {
579        // enable split scale in just for Kinesis
580        matches!(
581            self,
582            ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
583        )
584    }
585
586    /// For most connectors, this should be false. When enabled, RisingWave should not track any progress.
587    pub fn enable_adaptive_splits(&self) -> bool {
588        matches!(self, ConnectorProperties::Nats(_))
589    }
590
591    /// Load additional info from `PbSource`. Currently only used by CDC.
592    pub fn init_from_pb_source(&mut self, source: &PbSource) {
593        dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
594    }
595
596    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
597    pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
598        dispatch_source_prop!(self, |prop| prop
599            .init_from_pb_cdc_table_desc(cdc_table_desc))
600    }
601
602    pub fn support_multiple_splits(&self) -> bool {
603        matches!(self, ConnectorProperties::Kafka(_))
604            || matches!(self, ConnectorProperties::OpendalS3(_))
605            || matches!(self, ConnectorProperties::Gcs(_))
606            || matches!(self, ConnectorProperties::Azblob(_))
607    }
608
609    pub async fn create_split_enumerator(
610        self,
611        context: crate::source::base::SourceEnumeratorContextRef,
612    ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
613        let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
614            <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
615        ));
616        Ok(enumerator)
617    }
618
619    pub async fn create_split_reader(
620        self,
621        splits: Vec<SplitImpl>,
622        parser_config: ParserConfig,
623        source_ctx: SourceContextRef,
624        columns: Option<Vec<Column>>,
625        mut opt: crate::source::CreateSplitReaderOpt,
626    ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
627        opt.support_multiple_splits = self.support_multiple_splits();
628        tracing::debug!(
629            ?splits,
630            support_multiple_splits = opt.support_multiple_splits,
631            "spawning connector split reader",
632        );
633
634        dispatch_source_prop!(self, |prop| create_split_readers(
635            *prop,
636            splits,
637            parser_config,
638            source_ctx,
639            columns,
640            opt
641        )
642        .await)
643    }
644}
645
646for_all_sources!(impl_split);
647for_all_connections!(impl_connection);
648
649impl From<&SplitImpl> for ConnectorSplit {
650    fn from(split: &SplitImpl) -> Self {
651        dispatch_split_impl!(split, |inner| {
652            ConnectorSplit {
653                split_type: String::from(PropType::SOURCE_NAME),
654                encoded_split: inner.encode_to_bytes().to_vec(),
655            }
656        })
657    }
658}
659
660impl TryFrom<&ConnectorSplit> for SplitImpl {
661    type Error = crate::error::ConnectorError;
662
663    fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
664        let split_type = split.split_type.to_lowercase();
665        match_source_name_str!(
666            split_type.as_str(),
667            PropType,
668            {
669                <PropType as SourceProperties>::Split::restore_from_bytes(
670                    split.encoded_split.as_ref(),
671                )
672                .map(Into::into)
673            },
674            |other| bail!("connector '{}' is not supported", other)
675        )
676    }
677}
678
679impl SplitImpl {
680    fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
681        let split_type = split_type.to_lowercase();
682        match_source_name_str!(
683            split_type.as_str(),
684            PropType,
685            <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
686            |other| bail!("connector '{}' is not supported", other)
687        )
688    }
689
690    pub fn is_cdc_split(&self) -> bool {
691        matches!(
692            self,
693            MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
694        )
695    }
696
697    /// Get the current split offset.
698    pub fn get_cdc_split_offset(&self) -> String {
699        match self {
700            MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
701            PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
702            MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
703            CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
704            SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
705            _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
706        }
707    }
708
709    pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
710        #[expect(clippy::match_single_binding)]
711        match self {
712            // SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
713            //     Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
714            // }
715            _ => None,
716        }
717    }
718}
719
720impl SplitMetaData for SplitImpl {
721    fn id(&self) -> SplitId {
722        dispatch_split_impl!(self, |inner| inner.id())
723    }
724
725    fn encode_to_json(&self) -> JsonbVal {
726        use serde_json::json;
727        let inner = self.encode_to_json_inner().take();
728        json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
729    }
730
731    fn restore_from_json(value: JsonbVal) -> Result<Self> {
732        let mut value = value.take();
733        let json_obj = value.as_object_mut().unwrap();
734        let split_type = json_obj
735            .remove(SPLIT_TYPE_FIELD)
736            .unwrap()
737            .as_str()
738            .unwrap()
739            .to_owned();
740        let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
741        Self::restore_from_json_inner(&split_type, inner_value.into())
742    }
743
744    fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
745        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
746    }
747}
748
749impl SplitImpl {
750    pub fn get_type(&self) -> String {
751        dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
752    }
753
754    pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
755        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
756        Ok(())
757    }
758
759    pub fn encode_to_json_inner(&self) -> JsonbVal {
760        dispatch_split_impl!(self, |inner| inner.encode_to_json())
761    }
762}
763
764use risingwave_common::types::DataType;
765
766#[derive(Clone, Debug)]
767pub struct Column {
768    pub name: String,
769    pub data_type: DataType,
770    /// This field is only used by datagen.
771    pub is_visible: bool,
772}
773
774/// Split id resides in every source message, use `Arc` to avoid copying.
775pub type SplitId = Arc<str>;
776
777/// The message pumped from the external source service.
778/// The third-party message structs will eventually be transformed into this struct.
779#[derive(Debug, Clone)]
780pub struct SourceMessage {
781    pub key: Option<Vec<u8>>,
782    pub payload: Option<Vec<u8>>,
783    pub offset: String, // TODO: use `Arc<str>`
784    pub split_id: SplitId,
785    pub meta: SourceMeta,
786}
787
788impl SourceMessage {
789    /// Create a dummy `SourceMessage` with all fields unset for testing purposes.
790    pub fn dummy() -> Self {
791        Self {
792            key: None,
793            payload: None,
794            offset: "".to_owned(),
795            split_id: "".into(),
796            meta: SourceMeta::Empty,
797        }
798    }
799
800    /// Check whether the source message is a CDC heartbeat message.
801    pub fn is_cdc_heartbeat(&self) -> bool {
802        self.key.is_none() && self.payload.is_none()
803    }
804}
805
806#[derive(Debug, Clone)]
807pub enum SourceMeta {
808    Kafka(KafkaMeta),
809    Kinesis(KinesisMeta),
810    Pulsar(PulsarMeta),
811    Nexmark(NexmarkMeta),
812    GooglePubsub(GooglePubsubMeta),
813    Datagen(DatagenMeta),
814    DebeziumCdc(DebeziumCdcMeta),
815    Nats(NatsMeta),
816    // For the source that doesn't have meta data.
817    Empty,
818}
819
820/// Implement Eq manually to ignore the `meta` field.
821impl PartialEq for SourceMessage {
822    fn eq(&self, other: &Self) -> bool {
823        self.offset == other.offset
824            && self.split_id == other.split_id
825            && self.payload == other.payload
826    }
827}
828impl Eq for SourceMessage {}
829
830/// The metadata of a split.
831pub trait SplitMetaData: Sized {
832    fn id(&self) -> SplitId;
833    fn encode_to_bytes(&self) -> Bytes {
834        self.encode_to_json()
835            .as_scalar_ref()
836            .value_serialize()
837            .into()
838    }
839    fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
840        Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
841    }
842
843    /// Encode the whole split metadata to a JSON object
844    fn encode_to_json(&self) -> JsonbVal;
845    fn restore_from_json(value: JsonbVal) -> Result<Self>;
846    fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
847}
848
849/// [`ConnectorState`] maintains the consuming splits' info. In specific split readers,
850/// `ConnectorState` cannot be [`None`] and contains one(for mq split readers) or many(for fs
851/// split readers) [`SplitImpl`]. If no split is assigned to source executor, `ConnectorState` is
852/// [`None`] and the created source stream will be a pending stream.
853pub type ConnectorState = Option<Vec<SplitImpl>>;
854
855#[cfg(test)]
856mod tests {
857    use maplit::*;
858    use nexmark::event::EventType;
859
860    use super::*;
861    use crate::source::cdc::{DebeziumCdcSplit, Mysql};
862    use crate::source::kafka::KafkaSplit;
863
864    #[test]
865    fn test_split_impl_get_fn() -> Result<()> {
866        let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
867        let split_impl = SplitImpl::Kafka(split.clone());
868        let get_value = split_impl.into_kafka().unwrap();
869        println!("{:?}", get_value);
870        assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
871        assert_eq!(split.encode_to_json(), get_value.encode_to_json());
872
873        Ok(())
874    }
875
876    #[test]
877    fn test_cdc_split_state() -> Result<()> {
878        let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
879        let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
880        let split_impl = SplitImpl::MysqlCdc(split);
881        let encoded_split = split_impl.encode_to_bytes();
882        let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
883        assert_eq!(
884            split_impl.encode_to_bytes(),
885            restored_split_impl.encode_to_bytes()
886        );
887        assert_eq!(
888            split_impl.encode_to_json(),
889            restored_split_impl.encode_to_json()
890        );
891
892        let encoded_split = split_impl.encode_to_json();
893        let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
894        assert_eq!(
895            split_impl.encode_to_bytes(),
896            restored_split_impl.encode_to_bytes()
897        );
898        assert_eq!(
899            split_impl.encode_to_json(),
900            restored_split_impl.encode_to_json()
901        );
902        Ok(())
903    }
904
905    #[test]
906    fn test_extract_nexmark_config() {
907        let props = convert_args!(btreemap!(
908            "connector" => "nexmark",
909            "nexmark.table.type" => "Person",
910            "nexmark.split.num" => "1",
911        ));
912
913        let props =
914            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
915                .unwrap();
916
917        if let ConnectorProperties::Nexmark(props) = props {
918            assert_eq!(props.table_type, Some(EventType::Person));
919            assert_eq!(props.split_num, 1);
920        } else {
921            panic!("extract nexmark config failed");
922        }
923    }
924
925    #[test]
926    fn test_extract_kafka_config() {
927        let props = convert_args!(btreemap!(
928            "connector" => "kafka",
929            "properties.bootstrap.server" => "b1,b2",
930            "topic" => "test",
931            "scan.startup.mode" => "earliest",
932            "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
933        ));
934
935        let props =
936            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
937                .unwrap();
938        if let ConnectorProperties::Kafka(k) = props {
939            let btreemap = btreemap! {
940                "b-1:9092".to_owned() => "dns-1".to_owned(),
941                "b-2:9092".to_owned() => "dns-2".to_owned(),
942            };
943            assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
944        } else {
945            panic!("extract kafka config failed");
946        }
947    }
948
949    #[test]
950    fn test_extract_cdc_properties() {
951        let user_props_mysql = convert_args!(btreemap!(
952            "connector" => "mysql-cdc",
953            "database.hostname" => "127.0.0.1",
954            "database.port" => "3306",
955            "database.user" => "root",
956            "database.password" => "123456",
957            "database.name" => "mydb",
958            "table.name" => "products",
959        ));
960
961        let user_props_postgres = convert_args!(btreemap!(
962            "connector" => "postgres-cdc",
963            "database.hostname" => "127.0.0.1",
964            "database.port" => "5432",
965            "database.user" => "root",
966            "database.password" => "654321",
967            "schema.name" => "public",
968            "database.name" => "mypgdb",
969            "table.name" => "orders",
970        ));
971
972        let conn_props = ConnectorProperties::extract(
973            WithOptionsSecResolved::without_secrets(user_props_mysql),
974            true,
975        )
976        .unwrap();
977        if let ConnectorProperties::MysqlCdc(c) = conn_props {
978            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
979            assert_eq!(c.properties.get("database.port").unwrap(), "3306");
980            assert_eq!(c.properties.get("database.user").unwrap(), "root");
981            assert_eq!(c.properties.get("database.password").unwrap(), "123456");
982            assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
983            assert_eq!(c.properties.get("table.name").unwrap(), "products");
984        } else {
985            panic!("extract cdc config failed");
986        }
987
988        let conn_props = ConnectorProperties::extract(
989            WithOptionsSecResolved::without_secrets(user_props_postgres),
990            true,
991        )
992        .unwrap();
993        if let ConnectorProperties::PostgresCdc(c) = conn_props {
994            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
995            assert_eq!(c.properties.get("database.port").unwrap(), "5432");
996            assert_eq!(c.properties.get("database.user").unwrap(), "root");
997            assert_eq!(c.properties.get("database.password").unwrap(), "654321");
998            assert_eq!(c.properties.get("schema.name").unwrap(), "public");
999            assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1000            assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1001        } else {
1002            panic!("extract cdc config failed");
1003        }
1004    }
1005}