risingwave_connector/source/
base.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::catalog::TableId;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::monitor::EnumeratorMetrics;
54use crate::with_options::WithOptions;
55use crate::{
56    WithOptionsSecResolved, dispatch_source_prop, dispatch_split_impl, for_all_connections,
57    for_all_sources, impl_connection, impl_connector_properties, impl_split, match_source_name_str,
58};
59
60const SPLIT_TYPE_FIELD: &str = "split_type";
61const SPLIT_INFO_FIELD: &str = "split_info";
62pub const UPSTREAM_SOURCE_KEY: &str = "connector";
63
64pub const WEBHOOK_CONNECTOR: &str = "webhook";
65
66pub trait TryFromBTreeMap: Sized + UnknownFields {
67    /// Used to initialize the source properties from the raw untyped `WITH` options.
68    fn try_from_btreemap(
69        props: BTreeMap<String, String>,
70        deny_unknown_fields: bool,
71    ) -> Result<Self>;
72}
73
74/// Represents `WITH` options for sources.
75///
76/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
77pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug {
78    const SOURCE_NAME: &'static str;
79    type Split: SplitMetaData
80        + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
81        + Into<SplitImpl>;
82    type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
83    type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
84
85    /// Load additional info from `PbSource`. Currently only used by CDC.
86    fn init_from_pb_source(&mut self, _source: &PbSource) {}
87
88    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
89    fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
90}
91
92pub trait UnknownFields {
93    /// Unrecognized fields in the `WITH` clause.
94    fn unknown_fields(&self) -> HashMap<String, String>;
95}
96
97impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
98    fn try_from_btreemap(
99        props: BTreeMap<String, String>,
100        deny_unknown_fields: bool,
101    ) -> Result<Self> {
102        let json_value = serde_json::to_value(props)?;
103        let res = serde_json::from_value::<P>(json_value)?;
104
105        if !deny_unknown_fields || res.unknown_fields().is_empty() {
106            Ok(res)
107        } else {
108            bail!(
109                "Unknown fields in the WITH clause: {:?}",
110                res.unknown_fields()
111            )
112        }
113    }
114}
115
116#[derive(Default)]
117pub struct CreateSplitReaderOpt {
118    pub support_multiple_splits: bool,
119    pub seek_to_latest: bool,
120}
121
122#[derive(Default)]
123pub struct CreateSplitReaderResult {
124    pub latest_splits: Option<Vec<SplitImpl>>,
125    pub backfill_info: HashMap<SplitId, BackfillInfo>,
126}
127
128pub async fn create_split_readers<P: SourceProperties>(
129    prop: P,
130    splits: Vec<SplitImpl>,
131    parser_config: ParserConfig,
132    source_ctx: SourceContextRef,
133    columns: Option<Vec<Column>>,
134    opt: CreateSplitReaderOpt,
135) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
136    let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
137    let mut res = CreateSplitReaderResult {
138        backfill_info: HashMap::new(),
139        latest_splits: None,
140    };
141    if opt.support_multiple_splits {
142        let mut reader = P::SplitReader::new(
143            prop.clone(),
144            splits,
145            parser_config.clone(),
146            source_ctx.clone(),
147            columns.clone(),
148        )
149        .await?;
150        if opt.seek_to_latest {
151            res.latest_splits = Some(reader.seek_to_latest().await?);
152        }
153        res.backfill_info = reader.backfill_info();
154        Ok((reader.into_stream().boxed(), res))
155    } else {
156        let mut readers = try_join_all(splits.into_iter().map(|split| {
157            // TODO: is this reader split across multiple threads...? Realistically, we want
158            // source_ctx to live in a single actor.
159            P::SplitReader::new(
160                prop.clone(),
161                vec![split],
162                parser_config.clone(),
163                source_ctx.clone(),
164                columns.clone(),
165            )
166        }))
167        .await?;
168        if opt.seek_to_latest {
169            let mut latest_splits = vec![];
170            for reader in &mut readers {
171                latest_splits.extend(reader.seek_to_latest().await?);
172            }
173            res.latest_splits = Some(latest_splits);
174        }
175        res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
176        Ok((
177            select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
178            res,
179        ))
180    }
181}
182
183/// [`SplitEnumerator`] fetches the split metadata from the external source service.
184/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
185#[async_trait]
186pub trait SplitEnumerator: Sized + Send {
187    type Split: SplitMetaData + Send;
188    type Properties;
189
190    async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
191    -> Result<Self>;
192    async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
193    /// Do some cleanup work when a fragment is dropped, e.g., drop Kafka consumer group.
194    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
195        Ok(())
196    }
197    /// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
198    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
199        Ok(())
200    }
201}
202
203pub type SourceContextRef = Arc<SourceContext>;
204pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
205
206/// Dyn-compatible [`SplitEnumerator`].
207#[async_trait]
208pub trait AnySplitEnumerator: Send {
209    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
210    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
211    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
212}
213
214#[async_trait]
215impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
216    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
217        SplitEnumerator::list_splits(self)
218            .await
219            .map(|s| s.into_iter().map(|s| s.into()).collect())
220    }
221
222    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
223        SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
224    }
225
226    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
227        SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
228    }
229}
230
231/// The max size of a chunk yielded by source stream.
232pub const MAX_CHUNK_SIZE: usize = 1024;
233
234#[derive(Debug, Clone, Copy)]
235pub struct SourceCtrlOpts {
236    /// The max size of a chunk yielded by source stream.
237    pub chunk_size: usize,
238    /// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`.
239    pub split_txn: bool,
240}
241
242// The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it,
243// so that we can prevent any unintentional use of the default value.
244impl !Default for SourceCtrlOpts {}
245
246impl SourceCtrlOpts {
247    #[cfg(test)]
248    pub fn for_test() -> Self {
249        SourceCtrlOpts {
250            chunk_size: 256,
251            split_txn: false,
252        }
253    }
254}
255
256#[derive(Debug)]
257pub struct SourceEnumeratorContext {
258    pub info: SourceEnumeratorInfo,
259    pub metrics: Arc<EnumeratorMetrics>,
260}
261
262impl SourceEnumeratorContext {
263    /// Create a dummy `SourceEnumeratorContext` for testing purpose, or for the situation
264    /// where the real context doesn't matter.
265    pub fn dummy() -> SourceEnumeratorContext {
266        SourceEnumeratorContext {
267            info: SourceEnumeratorInfo { source_id: 0 },
268            metrics: Arc::new(EnumeratorMetrics::default()),
269        }
270    }
271}
272
273#[derive(Clone, Debug)]
274pub struct SourceEnumeratorInfo {
275    pub source_id: u32,
276}
277
278#[derive(Debug, Clone)]
279pub struct SourceContext {
280    pub actor_id: u32,
281    pub source_id: TableId,
282    pub fragment_id: u32,
283    pub source_name: String,
284    pub metrics: Arc<SourceMetrics>,
285    pub source_ctrl_opts: SourceCtrlOpts,
286    pub connector_props: ConnectorProperties,
287    // source parser put schema change event into this channel
288    pub schema_change_tx:
289        Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
290}
291
292impl SourceContext {
293    pub fn new(
294        actor_id: u32,
295        source_id: TableId,
296        fragment_id: u32,
297        source_name: String,
298        metrics: Arc<SourceMetrics>,
299        source_ctrl_opts: SourceCtrlOpts,
300        connector_props: ConnectorProperties,
301        schema_change_channel: Option<
302            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
303        >,
304    ) -> Self {
305        Self {
306            actor_id,
307            source_id,
308            fragment_id,
309            source_name,
310            metrics,
311            source_ctrl_opts,
312            connector_props,
313            schema_change_tx: schema_change_channel,
314        }
315    }
316
317    /// Create a dummy `SourceContext` for testing purpose, or for the situation
318    /// where the real context doesn't matter.
319    pub fn dummy() -> Self {
320        Self::new(
321            0,
322            TableId::new(0),
323            0,
324            "dummy".to_owned(),
325            Arc::new(SourceMetrics::default()),
326            SourceCtrlOpts {
327                chunk_size: MAX_CHUNK_SIZE,
328                split_txn: false,
329            },
330            ConnectorProperties::default(),
331            None,
332        )
333    }
334}
335
336#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
337pub enum SourceFormat {
338    #[default]
339    Invalid,
340    Native,
341    None,
342    Debezium,
343    DebeziumMongo,
344    Maxwell,
345    Canal,
346    Upsert,
347    Plain,
348}
349
350/// Refer to [`crate::parser::EncodingProperties`]
351#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
352pub enum SourceEncode {
353    #[default]
354    Invalid,
355    Native,
356    None,
357    Avro,
358    Csv,
359    Protobuf,
360    Json,
361    Bytes,
362    Parquet,
363}
364
365#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
366pub struct SourceStruct {
367    pub format: SourceFormat,
368    pub encode: SourceEncode,
369}
370
371impl SourceStruct {
372    pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
373        Self { format, encode }
374    }
375}
376
377// Only return valid (format, encode)
378pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
379    use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
380
381    // old version meta.
382    if let Ok(format) = info.get_row_format() {
383        let (format, encode) = match format {
384            RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
385            RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
386            RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
387            RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
388            RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
389            RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
390            RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
391            RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
392            RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
393            RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
394            RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
395            RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
396            RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
397            RowFormatType::RowUnspecified => unreachable!(),
398        };
399        return Ok(SourceStruct::new(format, encode));
400    }
401    let source_format = info.get_format()?;
402    let source_encode = info.get_row_encode()?;
403    let (format, encode) = match (source_format, source_encode) {
404        (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
405        (PbFormatType::Plain, PbEncodeType::Protobuf) => {
406            (SourceFormat::Plain, SourceEncode::Protobuf)
407        }
408        (PbFormatType::Debezium, PbEncodeType::Json) => {
409            (SourceFormat::Debezium, SourceEncode::Json)
410        }
411        (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
412        (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
413        (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
414        (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
415        (PbFormatType::Plain, PbEncodeType::Parquet) => {
416            (SourceFormat::Plain, SourceEncode::Parquet)
417        }
418        (PbFormatType::Native, PbEncodeType::Native) => {
419            (SourceFormat::Native, SourceEncode::Native)
420        }
421        (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
422        (PbFormatType::Debezium, PbEncodeType::Avro) => {
423            (SourceFormat::Debezium, SourceEncode::Avro)
424        }
425        (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
426        (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
427        (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
428            (SourceFormat::DebeziumMongo, SourceEncode::Json)
429        }
430        (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
431        (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
432            (SourceFormat::Upsert, SourceEncode::Protobuf)
433        }
434        (format, encode) => {
435            bail!(
436                "Unsupported combination of format {:?} and encode {:?}",
437                format,
438                encode
439            );
440        }
441    };
442    Ok(SourceStruct::new(format, encode))
443}
444
445/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch.
446pub type BoxSourceMessageStream =
447    BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
448/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
449pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
450pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
451pub type BoxSourceChunkWithStateStream =
452    BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
453
454/// Stream of [`Option<StreamChunk>`]s parsed from the messages from the external source.
455pub type BoxStreamingFileSourceChunkStream =
456    BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
457
458// Manually expand the trait alias to improve IDE experience.
459pub trait SourceChunkStream:
460    Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
461{
462}
463impl<T> SourceChunkStream for T where
464    T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
465{
466}
467
468pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
469
470/// [`SplitReader`] is a new abstraction of the external connector read interface which is
471/// responsible for parsing, it is used to read messages from the outside and transform them into a
472/// stream of parsed [`StreamChunk`]
473#[async_trait]
474pub trait SplitReader: Sized + Send {
475    type Properties;
476    type Split: SplitMetaData;
477
478    async fn new(
479        properties: Self::Properties,
480        state: Vec<Self::Split>,
481        parser_config: ParserConfig,
482        source_ctx: SourceContextRef,
483        columns: Option<Vec<Column>>,
484    ) -> crate::error::ConnectorResult<Self>;
485
486    fn into_stream(self) -> BoxSourceChunkStream;
487
488    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
489        HashMap::new()
490    }
491
492    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
493        Err(anyhow!("seek_to_latest is not supported for this connector").into())
494    }
495}
496
497/// Information used to determine whether we should start and finish source backfill.
498///
499/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
500/// perhaps we should ban blocking DDL for it.
501#[derive(Debug, Clone)]
502pub enum BackfillInfo {
503    HasDataToBackfill {
504        /// The last available offsets for each split (**inclusive**).
505        ///
506        /// This will be used to determine whether source backfill is finished when
507        /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
508        /// blocking DDL cannot finish until new messages come.
509        ///
510        /// When there are upstream messages, we will use the latest offsets from the upstream.
511        latest_offset: String,
512    },
513    /// If there are no messages in the split at all, we don't need to start backfill.
514    /// In this case, there will be no message from the backfill stream too.
515    /// If we started backfill, we cannot finish it until new messages come.
516    /// So we mark this a special case for optimization.
517    NoDataToBackfill,
518}
519
520for_all_sources!(impl_connector_properties);
521
522impl Default for ConnectorProperties {
523    fn default() -> Self {
524        ConnectorProperties::Test(Box::default())
525    }
526}
527
528impl ConnectorProperties {
529    pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
530        with_properties
531            .get(UPSTREAM_SOURCE_KEY)
532            .map(|s| {
533                s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
534                    || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
535                    || s.eq_ignore_ascii_case(GCS_CONNECTOR)
536                    || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
537            })
538            .unwrap_or(false)
539    }
540}
541
542impl ConnectorProperties {
543    /// Creates typed source properties from the raw `WITH` properties.
544    ///
545    /// It checks the `connector` field, and them dispatches to the corresponding type's `try_from_btreemap` method.
546    ///
547    /// `deny_unknown_fields`: Since `WITH` options are persisted in meta, we do not deny unknown fields when restoring from
548    /// existing data to avoid breaking backwards compatibility. We only deny unknown fields when creating new sources.
549    pub fn extract(
550        with_properties: WithOptionsSecResolved,
551        deny_unknown_fields: bool,
552    ) -> Result<Self> {
553        let (options, secret_refs) = with_properties.into_parts();
554        let mut options_with_secret =
555            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
556        let connector = options_with_secret
557            .remove(UPSTREAM_SOURCE_KEY)
558            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
559            .to_lowercase();
560        match_source_name_str!(
561            connector.as_str(),
562            PropType,
563            PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
564                .map(ConnectorProperties::from),
565            |other| bail!("connector '{}' is not supported", other)
566        )
567    }
568
569    pub fn enable_drop_split(&self) -> bool {
570        // enable split scale in just for Kinesis
571        matches!(
572            self,
573            ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
574        )
575    }
576
577    /// For most connectors, this should be false. When enabled, RisingWave should not track any progress.
578    pub fn enable_adaptive_splits(&self) -> bool {
579        matches!(self, ConnectorProperties::Nats(_))
580    }
581
582    /// Load additional info from `PbSource`. Currently only used by CDC.
583    pub fn init_from_pb_source(&mut self, source: &PbSource) {
584        dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
585    }
586
587    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
588    pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
589        dispatch_source_prop!(self, |prop| prop
590            .init_from_pb_cdc_table_desc(cdc_table_desc))
591    }
592
593    pub fn support_multiple_splits(&self) -> bool {
594        matches!(self, ConnectorProperties::Kafka(_))
595            || matches!(self, ConnectorProperties::OpendalS3(_))
596            || matches!(self, ConnectorProperties::Gcs(_))
597            || matches!(self, ConnectorProperties::Azblob(_))
598    }
599
600    pub async fn create_split_enumerator(
601        self,
602        context: crate::source::base::SourceEnumeratorContextRef,
603    ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
604        let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
605            <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
606        ));
607        Ok(enumerator)
608    }
609
610    pub async fn create_split_reader(
611        self,
612        splits: Vec<SplitImpl>,
613        parser_config: ParserConfig,
614        source_ctx: SourceContextRef,
615        columns: Option<Vec<Column>>,
616        mut opt: crate::source::CreateSplitReaderOpt,
617    ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
618        opt.support_multiple_splits = self.support_multiple_splits();
619        tracing::debug!(
620            ?splits,
621            support_multiple_splits = opt.support_multiple_splits,
622            "spawning connector split reader",
623        );
624
625        dispatch_source_prop!(self, |prop| create_split_readers(
626            *prop,
627            splits,
628            parser_config,
629            source_ctx,
630            columns,
631            opt
632        )
633        .await)
634    }
635}
636
637for_all_sources!(impl_split);
638for_all_connections!(impl_connection);
639
640impl From<&SplitImpl> for ConnectorSplit {
641    fn from(split: &SplitImpl) -> Self {
642        dispatch_split_impl!(split, |inner| {
643            ConnectorSplit {
644                split_type: String::from(PropType::SOURCE_NAME),
645                encoded_split: inner.encode_to_bytes().to_vec(),
646            }
647        })
648    }
649}
650
651impl TryFrom<&ConnectorSplit> for SplitImpl {
652    type Error = crate::error::ConnectorError;
653
654    fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
655        let split_type = split.split_type.to_lowercase();
656        match_source_name_str!(
657            split_type.as_str(),
658            PropType,
659            {
660                <PropType as SourceProperties>::Split::restore_from_bytes(
661                    split.encoded_split.as_ref(),
662                )
663                .map(Into::into)
664            },
665            |other| bail!("connector '{}' is not supported", other)
666        )
667    }
668}
669
670impl SplitImpl {
671    fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
672        let split_type = split_type.to_lowercase();
673        match_source_name_str!(
674            split_type.as_str(),
675            PropType,
676            <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
677            |other| bail!("connector '{}' is not supported", other)
678        )
679    }
680
681    pub fn is_cdc_split(&self) -> bool {
682        matches!(
683            self,
684            MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
685        )
686    }
687
688    /// Get the current split offset.
689    pub fn get_cdc_split_offset(&self) -> String {
690        match self {
691            MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
692            PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
693            MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
694            CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
695            SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
696            _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
697        }
698    }
699}
700
701impl SplitMetaData for SplitImpl {
702    fn id(&self) -> SplitId {
703        dispatch_split_impl!(self, |inner| inner.id())
704    }
705
706    fn encode_to_json(&self) -> JsonbVal {
707        use serde_json::json;
708        let inner = self.encode_to_json_inner().take();
709        json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
710    }
711
712    fn restore_from_json(value: JsonbVal) -> Result<Self> {
713        let mut value = value.take();
714        let json_obj = value.as_object_mut().unwrap();
715        let split_type = json_obj
716            .remove(SPLIT_TYPE_FIELD)
717            .unwrap()
718            .as_str()
719            .unwrap()
720            .to_owned();
721        let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
722        Self::restore_from_json_inner(&split_type, inner_value.into())
723    }
724
725    fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
726        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
727    }
728}
729
730impl SplitImpl {
731    pub fn get_type(&self) -> String {
732        dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
733    }
734
735    pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
736        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
737        Ok(())
738    }
739
740    pub fn encode_to_json_inner(&self) -> JsonbVal {
741        dispatch_split_impl!(self, |inner| inner.encode_to_json())
742    }
743}
744
745use risingwave_common::types::DataType;
746
747#[derive(Clone, Debug)]
748pub struct Column {
749    pub name: String,
750    pub data_type: DataType,
751    /// This field is only used by datagen.
752    pub is_visible: bool,
753}
754
755/// Split id resides in every source message, use `Arc` to avoid copying.
756pub type SplitId = Arc<str>;
757
758/// The message pumped from the external source service.
759/// The third-party message structs will eventually be transformed into this struct.
760#[derive(Debug, Clone)]
761pub struct SourceMessage {
762    pub key: Option<Vec<u8>>,
763    pub payload: Option<Vec<u8>>,
764    pub offset: String, // TODO: use `Arc<str>`
765    pub split_id: SplitId,
766    pub meta: SourceMeta,
767}
768
769impl SourceMessage {
770    /// Create a dummy `SourceMessage` with all fields unset for testing purposes.
771    pub fn dummy() -> Self {
772        Self {
773            key: None,
774            payload: None,
775            offset: "".to_owned(),
776            split_id: "".into(),
777            meta: SourceMeta::Empty,
778        }
779    }
780
781    /// Check whether the source message is a CDC heartbeat message.
782    pub fn is_cdc_heartbeat(&self) -> bool {
783        self.key.is_none() && self.payload.is_none()
784    }
785}
786
787#[derive(Debug, Clone)]
788pub enum SourceMeta {
789    Kafka(KafkaMeta),
790    Kinesis(KinesisMeta),
791    Pulsar(PulsarMeta),
792    Nexmark(NexmarkMeta),
793    GooglePubsub(GooglePubsubMeta),
794    Datagen(DatagenMeta),
795    DebeziumCdc(DebeziumCdcMeta),
796    Nats(NatsMeta),
797    // For the source that doesn't have meta data.
798    Empty,
799}
800
801/// Implement Eq manually to ignore the `meta` field.
802impl PartialEq for SourceMessage {
803    fn eq(&self, other: &Self) -> bool {
804        self.offset == other.offset
805            && self.split_id == other.split_id
806            && self.payload == other.payload
807    }
808}
809impl Eq for SourceMessage {}
810
811/// The metadata of a split.
812pub trait SplitMetaData: Sized {
813    fn id(&self) -> SplitId;
814    fn encode_to_bytes(&self) -> Bytes {
815        self.encode_to_json()
816            .as_scalar_ref()
817            .value_serialize()
818            .into()
819    }
820    fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
821        Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
822    }
823
824    /// Encode the whole split metadata to a JSON object
825    fn encode_to_json(&self) -> JsonbVal;
826    fn restore_from_json(value: JsonbVal) -> Result<Self>;
827    fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
828}
829
830/// [`ConnectorState`] maintains the consuming splits' info. In specific split readers,
831/// `ConnectorState` cannot be [`None`] and contains one(for mq split readers) or many(for fs
832/// split readers) [`SplitImpl`]. If no split is assigned to source executor, `ConnectorState` is
833/// [`None`] and the created source stream will be a pending stream.
834pub type ConnectorState = Option<Vec<SplitImpl>>;
835
836#[cfg(test)]
837mod tests {
838    use maplit::*;
839    use nexmark::event::EventType;
840
841    use super::*;
842    use crate::source::cdc::{DebeziumCdcSplit, Mysql};
843    use crate::source::kafka::KafkaSplit;
844
845    #[test]
846    fn test_split_impl_get_fn() -> Result<()> {
847        let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
848        let split_impl = SplitImpl::Kafka(split.clone());
849        let get_value = split_impl.into_kafka().unwrap();
850        println!("{:?}", get_value);
851        assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
852        assert_eq!(split.encode_to_json(), get_value.encode_to_json());
853
854        Ok(())
855    }
856
857    #[test]
858    fn test_cdc_split_state() -> Result<()> {
859        let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
860        let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
861        let split_impl = SplitImpl::MysqlCdc(split);
862        let encoded_split = split_impl.encode_to_bytes();
863        let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
864        assert_eq!(
865            split_impl.encode_to_bytes(),
866            restored_split_impl.encode_to_bytes()
867        );
868        assert_eq!(
869            split_impl.encode_to_json(),
870            restored_split_impl.encode_to_json()
871        );
872
873        let encoded_split = split_impl.encode_to_json();
874        let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
875        assert_eq!(
876            split_impl.encode_to_bytes(),
877            restored_split_impl.encode_to_bytes()
878        );
879        assert_eq!(
880            split_impl.encode_to_json(),
881            restored_split_impl.encode_to_json()
882        );
883        Ok(())
884    }
885
886    #[test]
887    fn test_extract_nexmark_config() {
888        let props = convert_args!(btreemap!(
889            "connector" => "nexmark",
890            "nexmark.table.type" => "Person",
891            "nexmark.split.num" => "1",
892        ));
893
894        let props =
895            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
896                .unwrap();
897
898        if let ConnectorProperties::Nexmark(props) = props {
899            assert_eq!(props.table_type, Some(EventType::Person));
900            assert_eq!(props.split_num, 1);
901        } else {
902            panic!("extract nexmark config failed");
903        }
904    }
905
906    #[test]
907    fn test_extract_kafka_config() {
908        let props = convert_args!(btreemap!(
909            "connector" => "kafka",
910            "properties.bootstrap.server" => "b1,b2",
911            "topic" => "test",
912            "scan.startup.mode" => "earliest",
913            "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
914        ));
915
916        let props =
917            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
918                .unwrap();
919        if let ConnectorProperties::Kafka(k) = props {
920            let btreemap = btreemap! {
921                "b-1:9092".to_owned() => "dns-1".to_owned(),
922                "b-2:9092".to_owned() => "dns-2".to_owned(),
923            };
924            assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
925        } else {
926            panic!("extract kafka config failed");
927        }
928    }
929
930    #[test]
931    fn test_extract_cdc_properties() {
932        let user_props_mysql = convert_args!(btreemap!(
933            "connector" => "mysql-cdc",
934            "database.hostname" => "127.0.0.1",
935            "database.port" => "3306",
936            "database.user" => "root",
937            "database.password" => "123456",
938            "database.name" => "mydb",
939            "table.name" => "products",
940        ));
941
942        let user_props_postgres = convert_args!(btreemap!(
943            "connector" => "postgres-cdc",
944            "database.hostname" => "127.0.0.1",
945            "database.port" => "5432",
946            "database.user" => "root",
947            "database.password" => "654321",
948            "schema.name" => "public",
949            "database.name" => "mypgdb",
950            "table.name" => "orders",
951        ));
952
953        let conn_props = ConnectorProperties::extract(
954            WithOptionsSecResolved::without_secrets(user_props_mysql),
955            true,
956        )
957        .unwrap();
958        if let ConnectorProperties::MysqlCdc(c) = conn_props {
959            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
960            assert_eq!(c.properties.get("database.port").unwrap(), "3306");
961            assert_eq!(c.properties.get("database.user").unwrap(), "root");
962            assert_eq!(c.properties.get("database.password").unwrap(), "123456");
963            assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
964            assert_eq!(c.properties.get("table.name").unwrap(), "products");
965        } else {
966            panic!("extract cdc config failed");
967        }
968
969        let conn_props = ConnectorProperties::extract(
970            WithOptionsSecResolved::without_secrets(user_props_postgres),
971            true,
972        )
973        .unwrap();
974        if let ConnectorProperties::PostgresCdc(c) = conn_props {
975            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
976            assert_eq!(c.properties.get("database.port").unwrap(), "5432");
977            assert_eq!(c.properties.get("database.user").unwrap(), "root");
978            assert_eq!(c.properties.get("database.password").unwrap(), "654321");
979            assert_eq!(c.properties.get("schema.name").unwrap(), "public");
980            assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
981            assert_eq!(c.properties.get("table.name").unwrap(), "orders");
982        } else {
983            panic!("extract cdc config failed");
984        }
985    }
986}