risingwave_connector/source/
base.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::catalog::TableId;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use crate::enforce_secret::EnforceSecret;
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::batch::BatchSourceSplitImpl;
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57    WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58    for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59    match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68/// Callback wrapper for reporting CDC auto schema change fail events
69/// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
70#[derive(Clone)]
71pub struct CdcAutoSchemaChangeFailCallback(
72    Arc<dyn Fn(u32, String, String, String, String) + Send + Sync>,
73);
74
75impl CdcAutoSchemaChangeFailCallback {
76    pub fn new<F>(f: F) -> Self
77    where
78        F: Fn(u32, String, String, String, String) + Send + Sync + 'static,
79    {
80        Self(Arc::new(f))
81    }
82
83    pub fn call(
84        &self,
85        table_id: u32,
86        table_name: String,
87        cdc_table_id: String,
88        upstream_ddl: String,
89        fail_info: String,
90    ) {
91        self.0(table_id, table_name, cdc_table_id, upstream_ddl, fail_info);
92    }
93}
94
95impl std::fmt::Debug for CdcAutoSchemaChangeFailCallback {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.write_str("CdcAutoSchemaChangeFailCallback")
98    }
99}
100pub trait TryFromBTreeMap: Sized + UnknownFields {
101    /// Used to initialize the source properties from the raw untyped `WITH` options.
102    fn try_from_btreemap(
103        props: BTreeMap<String, String>,
104        deny_unknown_fields: bool,
105    ) -> Result<Self>;
106}
107
108/// Represents `WITH` options for sources.
109///
110/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
111pub trait SourceProperties:
112    TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
113{
114    const SOURCE_NAME: &'static str;
115    type Split: SplitMetaData
116        + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
117        + Into<SplitImpl>;
118    type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
119    type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
120
121    /// Load additional info from `PbSource`. Currently only used by CDC.
122    fn init_from_pb_source(&mut self, _source: &PbSource) {}
123
124    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
125    fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
126}
127
128pub trait UnknownFields {
129    /// Unrecognized fields in the `WITH` clause.
130    fn unknown_fields(&self) -> HashMap<String, String>;
131}
132
133impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
134    fn try_from_btreemap(
135        props: BTreeMap<String, String>,
136        deny_unknown_fields: bool,
137    ) -> Result<Self> {
138        let json_value = serde_json::to_value(props)?;
139        let res = serde_json::from_value::<P>(json_value)?;
140
141        if !deny_unknown_fields || res.unknown_fields().is_empty() {
142            Ok(res)
143        } else {
144            bail!(
145                "Unknown fields in the WITH clause: {:?}",
146                res.unknown_fields()
147            )
148        }
149    }
150}
151
152#[derive(Default)]
153pub struct CreateSplitReaderOpt {
154    pub support_multiple_splits: bool,
155    pub seek_to_latest: bool,
156}
157
158#[derive(Default)]
159pub struct CreateSplitReaderResult {
160    pub latest_splits: Option<Vec<SplitImpl>>,
161    pub backfill_info: HashMap<SplitId, BackfillInfo>,
162}
163
164pub async fn create_split_readers<P: SourceProperties>(
165    prop: P,
166    splits: Vec<SplitImpl>,
167    parser_config: ParserConfig,
168    source_ctx: SourceContextRef,
169    columns: Option<Vec<Column>>,
170    opt: CreateSplitReaderOpt,
171) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
172    let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
173    let mut res = CreateSplitReaderResult {
174        backfill_info: HashMap::new(),
175        latest_splits: None,
176    };
177    if opt.support_multiple_splits {
178        let mut reader = P::SplitReader::new(
179            prop.clone(),
180            splits,
181            parser_config.clone(),
182            source_ctx.clone(),
183            columns.clone(),
184        )
185        .await?;
186        if opt.seek_to_latest {
187            res.latest_splits = Some(reader.seek_to_latest().await?);
188        }
189        res.backfill_info = reader.backfill_info();
190        Ok((reader.into_stream().boxed(), res))
191    } else {
192        let mut readers = try_join_all(splits.into_iter().map(|split| {
193            // TODO: is this reader split across multiple threads...? Realistically, we want
194            // source_ctx to live in a single actor.
195            P::SplitReader::new(
196                prop.clone(),
197                vec![split],
198                parser_config.clone(),
199                source_ctx.clone(),
200                columns.clone(),
201            )
202        }))
203        .await?;
204        if opt.seek_to_latest {
205            let mut latest_splits = vec![];
206            for reader in &mut readers {
207                latest_splits.extend(reader.seek_to_latest().await?);
208            }
209            res.latest_splits = Some(latest_splits);
210        }
211        res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
212        Ok((
213            select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
214            res,
215        ))
216    }
217}
218
219/// [`SplitEnumerator`] fetches the split metadata from the external source service.
220/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
221#[async_trait]
222pub trait SplitEnumerator: Sized + Send {
223    type Split: SplitMetaData + Send;
224    type Properties;
225
226    async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
227    -> Result<Self>;
228    async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
229    /// Do some cleanup work when a fragment is dropped, e.g., drop Kafka consumer group.
230    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
231        Ok(())
232    }
233    /// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
234    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
235        Ok(())
236    }
237    /// Called after `worker.tick()` execution to perform periodic operations,
238    /// such as monitoring upstream PostgreSQL `confirmed_flush_lsn`, etc.
239    /// This can be extended to support more periodic operations in the future.
240    async fn on_tick(&mut self) -> Result<()> {
241        Ok(())
242    }
243}
244
245pub type SourceContextRef = Arc<SourceContext>;
246pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
247
248/// Dyn-compatible [`SplitEnumerator`].
249#[async_trait]
250pub trait AnySplitEnumerator: Send {
251    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
252    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
253    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
254    async fn on_tick(&mut self) -> Result<()>;
255}
256
257#[async_trait]
258impl<T: SplitEnumerator<Split: Into<SplitImpl>> + 'static> AnySplitEnumerator for T {
259    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
260        SplitEnumerator::list_splits(self)
261            .await
262            .map(|s| s.into_iter().map(|s| s.into()).collect())
263    }
264
265    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
266        SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
267    }
268
269    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
270        SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
271    }
272
273    async fn on_tick(&mut self) -> Result<()> {
274        SplitEnumerator::on_tick(self).await
275    }
276}
277
278/// The max size of a chunk yielded by source stream.
279pub const MAX_CHUNK_SIZE: usize = 1024;
280
281#[derive(Debug, Clone, Copy)]
282pub struct SourceCtrlOpts {
283    /// The max size of a chunk yielded by source stream.
284    pub chunk_size: usize,
285    /// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`.
286    pub split_txn: bool,
287}
288
289// The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it,
290// so that we can prevent any unintentional use of the default value.
291impl !Default for SourceCtrlOpts {}
292
293impl SourceCtrlOpts {
294    #[cfg(test)]
295    pub fn for_test() -> Self {
296        SourceCtrlOpts {
297            chunk_size: 256,
298            split_txn: false,
299        }
300    }
301}
302
303#[derive(Debug)]
304pub struct SourceEnumeratorContext {
305    pub info: SourceEnumeratorInfo,
306    pub metrics: Arc<EnumeratorMetrics>,
307}
308
309impl SourceEnumeratorContext {
310    /// Create a dummy `SourceEnumeratorContext` for testing purpose, or for the situation
311    /// where the real context doesn't matter.
312    pub fn dummy() -> SourceEnumeratorContext {
313        SourceEnumeratorContext {
314            info: SourceEnumeratorInfo { source_id: 0 },
315            metrics: Arc::new(EnumeratorMetrics::default()),
316        }
317    }
318}
319
320#[derive(Clone, Debug)]
321pub struct SourceEnumeratorInfo {
322    pub source_id: u32,
323}
324
325#[derive(Clone, Debug)]
326pub struct SourceContext {
327    pub actor_id: u32,
328    pub source_id: TableId,
329    pub fragment_id: u32,
330    pub source_name: String,
331    pub metrics: Arc<SourceMetrics>,
332    pub source_ctrl_opts: SourceCtrlOpts,
333    pub connector_props: ConnectorProperties,
334    // source parser put schema change event into this channel
335    pub schema_change_tx:
336        Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
337    // callback function to report CDC auto schema change fail events
338    pub on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
339}
340
341impl SourceContext {
342    pub fn new(
343        actor_id: u32,
344        source_id: TableId,
345        fragment_id: u32,
346        source_name: String,
347        metrics: Arc<SourceMetrics>,
348        source_ctrl_opts: SourceCtrlOpts,
349        connector_props: ConnectorProperties,
350        schema_change_channel: Option<
351            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
352        >,
353    ) -> Self {
354        Self::new_with_auto_schema_change_callback(
355            actor_id,
356            source_id,
357            fragment_id,
358            source_name,
359            metrics,
360            source_ctrl_opts,
361            connector_props,
362            schema_change_channel,
363            None,
364        )
365    }
366
367    pub fn new_with_auto_schema_change_callback(
368        actor_id: u32,
369        source_id: TableId,
370        fragment_id: u32,
371        source_name: String,
372        metrics: Arc<SourceMetrics>,
373        source_ctrl_opts: SourceCtrlOpts,
374        connector_props: ConnectorProperties,
375        schema_change_channel: Option<
376            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
377        >,
378        on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
379    ) -> Self {
380        Self {
381            actor_id,
382            source_id,
383            fragment_id,
384            source_name,
385            metrics,
386            source_ctrl_opts,
387            connector_props,
388            schema_change_tx: schema_change_channel,
389            on_cdc_auto_schema_change_failure,
390        }
391    }
392
393    /// Create a dummy `SourceContext` for testing purpose, or for the situation
394    /// where the real context doesn't matter.
395    pub fn dummy() -> Self {
396        Self::new(
397            0,
398            TableId::new(0),
399            0,
400            "dummy".to_owned(),
401            Arc::new(SourceMetrics::default()),
402            SourceCtrlOpts {
403                chunk_size: MAX_CHUNK_SIZE,
404                split_txn: false,
405            },
406            ConnectorProperties::default(),
407            None,
408        )
409    }
410
411    /// Report CDC auto schema change fail event
412    /// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
413    pub fn on_cdc_auto_schema_change_failure(
414        &self,
415        table_id: u32,
416        table_name: String,
417        cdc_table_id: String,
418        upstream_ddl: String,
419        fail_info: String,
420    ) {
421        if let Some(ref cdc_auto_schema_change_fail_callback) =
422            self.on_cdc_auto_schema_change_failure
423        {
424            cdc_auto_schema_change_fail_callback.call(
425                table_id,
426                table_name,
427                cdc_table_id,
428                upstream_ddl,
429                fail_info,
430            );
431        }
432    }
433}
434
435#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
436pub enum SourceFormat {
437    #[default]
438    Invalid,
439    Native,
440    None,
441    Debezium,
442    DebeziumMongo,
443    Maxwell,
444    Canal,
445    Upsert,
446    Plain,
447}
448
449/// Refer to [`crate::parser::EncodingProperties`]
450#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
451pub enum SourceEncode {
452    #[default]
453    Invalid,
454    Native,
455    None,
456    Avro,
457    Csv,
458    Protobuf,
459    Json,
460    Bytes,
461    Parquet,
462}
463
464#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
465pub struct SourceStruct {
466    pub format: SourceFormat,
467    pub encode: SourceEncode,
468}
469
470impl SourceStruct {
471    pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
472        Self { format, encode }
473    }
474}
475
476// Only return valid (format, encode)
477pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
478    use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
479
480    // old version meta.
481    if let Ok(format) = info.get_row_format() {
482        let (format, encode) = match format {
483            RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
484            RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
485            RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
486            RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
487            RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
488            RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
489            RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
490            RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
491            RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
492            RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
493            RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
494            RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
495            RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
496            RowFormatType::RowUnspecified => unreachable!(),
497        };
498        return Ok(SourceStruct::new(format, encode));
499    }
500    let source_format = info.get_format()?;
501    let source_encode = info.get_row_encode()?;
502    let (format, encode) = match (source_format, source_encode) {
503        (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
504        (PbFormatType::Plain, PbEncodeType::Protobuf) => {
505            (SourceFormat::Plain, SourceEncode::Protobuf)
506        }
507        (PbFormatType::Debezium, PbEncodeType::Json) => {
508            (SourceFormat::Debezium, SourceEncode::Json)
509        }
510        (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
511        (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
512        (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
513        (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
514        (PbFormatType::Plain, PbEncodeType::Parquet) => {
515            (SourceFormat::Plain, SourceEncode::Parquet)
516        }
517        (PbFormatType::Native, PbEncodeType::Native) => {
518            (SourceFormat::Native, SourceEncode::Native)
519        }
520        (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
521        (PbFormatType::Debezium, PbEncodeType::Avro) => {
522            (SourceFormat::Debezium, SourceEncode::Avro)
523        }
524        (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
525        (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
526        (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
527            (SourceFormat::DebeziumMongo, SourceEncode::Json)
528        }
529        (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
530        (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
531            (SourceFormat::Upsert, SourceEncode::Protobuf)
532        }
533        (format, encode) => {
534            bail!(
535                "Unsupported combination of format {:?} and encode {:?}",
536                format,
537                encode
538            );
539        }
540    };
541    Ok(SourceStruct::new(format, encode))
542}
543
544/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch.
545pub type BoxSourceMessageStream =
546    BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
547/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
548pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
549/// `StreamChunk` with the latest split state.
550/// The state is constructed in `StreamReaderBuilder::into_retry_stream`
551pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
552/// See [`StreamChunkWithState`].
553pub type BoxSourceChunkWithStateStream =
554    BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
555
556/// Stream of [`Option<StreamChunk>`]s parsed from the messages from the external source.
557pub type BoxStreamingFileSourceChunkStream =
558    BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
559
560// Manually expand the trait alias to improve IDE experience.
561pub trait SourceChunkStream:
562    Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
563{
564}
565impl<T> SourceChunkStream for T where
566    T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
567{
568}
569
570pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
571
572/// [`SplitReader`] is a new abstraction of the external connector read interface which is
573/// responsible for parsing, it is used to read messages from the outside and transform them into a
574/// stream of parsed [`StreamChunk`]
575#[async_trait]
576pub trait SplitReader: Sized + Send {
577    type Properties;
578    type Split: SplitMetaData;
579
580    async fn new(
581        properties: Self::Properties,
582        state: Vec<Self::Split>,
583        parser_config: ParserConfig,
584        source_ctx: SourceContextRef,
585        columns: Option<Vec<Column>>,
586    ) -> crate::error::ConnectorResult<Self>;
587
588    fn into_stream(self) -> BoxSourceChunkStream;
589
590    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
591        HashMap::new()
592    }
593
594    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
595        Err(anyhow!("seek_to_latest is not supported for this connector").into())
596    }
597}
598
599/// Information used to determine whether we should start and finish source backfill.
600///
601/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
602/// perhaps we should ban blocking DDL for it.
603#[derive(Debug, Clone)]
604pub enum BackfillInfo {
605    HasDataToBackfill {
606        /// The last available offsets for each split (**inclusive**).
607        ///
608        /// This will be used to determine whether source backfill is finished when
609        /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
610        /// blocking DDL cannot finish until new messages come.
611        ///
612        /// When there are upstream messages, we will use the latest offsets from the upstream.
613        latest_offset: String,
614    },
615    /// If there are no messages in the split at all, we don't need to start backfill.
616    /// In this case, there will be no message from the backfill stream too.
617    /// If we started backfill, we cannot finish it until new messages come.
618    /// So we mark this a special case for optimization.
619    NoDataToBackfill,
620}
621
622for_all_sources!(impl_connector_properties);
623
624impl Default for ConnectorProperties {
625    fn default() -> Self {
626        ConnectorProperties::Test(Box::default())
627    }
628}
629
630impl ConnectorProperties {
631    /// Creates typed source properties from the raw `WITH` properties.
632    ///
633    /// It checks the `connector` field, and them dispatches to the corresponding type's `try_from_btreemap` method.
634    ///
635    /// `deny_unknown_fields`: Since `WITH` options are persisted in meta, we do not deny unknown fields when restoring from
636    /// existing data to avoid breaking backwards compatibility. We only deny unknown fields when creating new sources.
637    pub fn extract(
638        with_properties: WithOptionsSecResolved,
639        deny_unknown_fields: bool,
640    ) -> Result<Self> {
641        let (options, secret_refs) = with_properties.into_parts();
642        let mut options_with_secret =
643            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
644        let connector = options_with_secret
645            .remove(UPSTREAM_SOURCE_KEY)
646            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
647            .to_lowercase();
648        match_source_name_str!(
649            connector.as_str(),
650            PropType,
651            PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
652                .map(ConnectorProperties::from),
653            |other| bail!("connector '{}' is not supported", other)
654        )
655    }
656
657    pub fn enforce_secret_source(
658        with_properties: &impl WithPropertiesExt,
659    ) -> crate::error::ConnectorResult<()> {
660        let connector = with_properties
661            .get_connector()
662            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
663            .to_lowercase();
664        let key_iter = with_properties.key_iter();
665        match_source_name_str!(
666            connector.as_str(),
667            PropType,
668            PropType::enforce_secret(key_iter),
669            |other| bail!("connector '{}' is not supported", other)
670        )
671    }
672
673    pub fn enable_drop_split(&self) -> bool {
674        // enable split scale in just for Kinesis
675        matches!(
676            self,
677            ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
678        )
679    }
680
681    /// For most connectors, this should be false. When enabled, RisingWave should not track any progress.
682    pub fn enable_adaptive_splits(&self) -> bool {
683        matches!(self, ConnectorProperties::Nats(_))
684    }
685
686    /// Load additional info from `PbSource`. Currently only used by CDC.
687    pub fn init_from_pb_source(&mut self, source: &PbSource) {
688        dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
689    }
690
691    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
692    pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
693        dispatch_source_prop!(self, |prop| prop
694            .init_from_pb_cdc_table_desc(cdc_table_desc))
695    }
696
697    pub fn support_multiple_splits(&self) -> bool {
698        matches!(self, ConnectorProperties::Kafka(_))
699            || matches!(self, ConnectorProperties::OpendalS3(_))
700            || matches!(self, ConnectorProperties::Gcs(_))
701            || matches!(self, ConnectorProperties::Azblob(_))
702    }
703
704    pub async fn create_split_enumerator(
705        self,
706        context: crate::source::base::SourceEnumeratorContextRef,
707    ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
708        let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
709            <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
710        ));
711        Ok(enumerator)
712    }
713
714    pub async fn create_split_reader(
715        self,
716        splits: Vec<SplitImpl>,
717        parser_config: ParserConfig,
718        source_ctx: SourceContextRef,
719        columns: Option<Vec<Column>>,
720        mut opt: crate::source::CreateSplitReaderOpt,
721    ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
722        opt.support_multiple_splits = self.support_multiple_splits();
723        tracing::debug!(
724            ?splits,
725            support_multiple_splits = opt.support_multiple_splits,
726            "spawning connector split reader",
727        );
728
729        dispatch_source_prop!(self, |prop| create_split_readers(
730            *prop,
731            splits,
732            parser_config,
733            source_ctx,
734            columns,
735            opt
736        )
737        .await)
738    }
739}
740
741for_all_sources!(impl_split);
742for_all_connections!(impl_connection);
743
744impl From<&SplitImpl> for ConnectorSplit {
745    fn from(split: &SplitImpl) -> Self {
746        dispatch_split_impl!(split, |inner| {
747            ConnectorSplit {
748                split_type: String::from(PropType::SOURCE_NAME),
749                encoded_split: inner.encode_to_bytes().to_vec(),
750            }
751        })
752    }
753}
754
755impl TryFrom<&ConnectorSplit> for SplitImpl {
756    type Error = crate::error::ConnectorError;
757
758    fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
759        let split_type = split.split_type.to_lowercase();
760        match_source_name_str!(
761            split_type.as_str(),
762            PropType,
763            {
764                <PropType as SourceProperties>::Split::restore_from_bytes(
765                    split.encoded_split.as_ref(),
766                )
767                .map(Into::into)
768            },
769            |other| bail!("connector '{}' is not supported", other)
770        )
771    }
772}
773
774impl SplitImpl {
775    fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
776        let split_type = split_type.to_lowercase();
777        match_source_name_str!(
778            split_type.as_str(),
779            PropType,
780            <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
781            |other| bail!("connector '{}' is not supported", other)
782        )
783    }
784
785    pub fn is_cdc_split(&self) -> bool {
786        matches!(
787            self,
788            MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
789        )
790    }
791
792    /// Get the current split offset.
793    pub fn get_cdc_split_offset(&self) -> String {
794        match self {
795            MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
796            PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
797            MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
798            CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
799            SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
800            _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
801        }
802    }
803
804    pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
805        #[expect(clippy::match_single_binding)]
806        match self {
807            // SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
808            //     Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
809            // }
810            _ => None,
811        }
812    }
813}
814
815impl SplitMetaData for SplitImpl {
816    fn id(&self) -> SplitId {
817        dispatch_split_impl!(self, |inner| inner.id())
818    }
819
820    fn encode_to_json(&self) -> JsonbVal {
821        use serde_json::json;
822        let inner = self.encode_to_json_inner().take();
823        json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
824    }
825
826    fn restore_from_json(value: JsonbVal) -> Result<Self> {
827        let mut value = value.take();
828        let json_obj = value.as_object_mut().unwrap();
829        let split_type = json_obj
830            .remove(SPLIT_TYPE_FIELD)
831            .unwrap()
832            .as_str()
833            .unwrap()
834            .to_owned();
835        let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
836        Self::restore_from_json_inner(&split_type, inner_value.into())
837    }
838
839    fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
840        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
841    }
842}
843
844impl SplitImpl {
845    pub fn get_type(&self) -> String {
846        dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
847    }
848
849    pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
850        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
851        Ok(())
852    }
853
854    pub fn encode_to_json_inner(&self) -> JsonbVal {
855        dispatch_split_impl!(self, |inner| inner.encode_to_json())
856    }
857}
858
859use risingwave_common::types::DataType;
860
861#[derive(Clone, Debug)]
862pub struct Column {
863    pub name: String,
864    pub data_type: DataType,
865    /// This field is only used by datagen.
866    pub is_visible: bool,
867}
868
869/// Split id resides in every source message, use `Arc` to avoid copying.
870pub type SplitId = Arc<str>;
871
872/// The message pumped from the external source service.
873/// The third-party message structs will eventually be transformed into this struct.
874#[derive(Debug, Clone)]
875pub struct SourceMessage {
876    pub key: Option<Vec<u8>>,
877    pub payload: Option<Vec<u8>>,
878    pub offset: String, // TODO: use `Arc<str>`
879    pub split_id: SplitId,
880    pub meta: SourceMeta,
881}
882
883impl SourceMessage {
884    /// Create a dummy `SourceMessage` with all fields unset for testing purposes.
885    pub fn dummy() -> Self {
886        Self {
887            key: None,
888            payload: None,
889            offset: "".to_owned(),
890            split_id: "".into(),
891            meta: SourceMeta::Empty,
892        }
893    }
894
895    /// Check whether the source message is a CDC heartbeat message.
896    pub fn is_cdc_heartbeat(&self) -> bool {
897        self.key.is_none() && self.payload.is_none()
898    }
899}
900
901#[derive(Debug, Clone)]
902pub enum SourceMeta {
903    Kafka(KafkaMeta),
904    Kinesis(KinesisMeta),
905    Pulsar(PulsarMeta),
906    Nexmark(NexmarkMeta),
907    GooglePubsub(GooglePubsubMeta),
908    Datagen(DatagenMeta),
909    DebeziumCdc(DebeziumCdcMeta),
910    Nats(NatsMeta),
911    // For the source that doesn't have meta data.
912    Empty,
913}
914
915/// Implement Eq manually to ignore the `meta` field.
916impl PartialEq for SourceMessage {
917    fn eq(&self, other: &Self) -> bool {
918        self.offset == other.offset
919            && self.split_id == other.split_id
920            && self.payload == other.payload
921    }
922}
923impl Eq for SourceMessage {}
924
925/// The metadata of a split.
926pub trait SplitMetaData: Sized {
927    fn id(&self) -> SplitId;
928    fn encode_to_bytes(&self) -> Bytes {
929        self.encode_to_json()
930            .as_scalar_ref()
931            .value_serialize()
932            .into()
933    }
934    fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
935        Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
936    }
937
938    /// Encode the whole split metadata to a JSON object
939    fn encode_to_json(&self) -> JsonbVal;
940    fn restore_from_json(value: JsonbVal) -> Result<Self>;
941    fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
942}
943
944/// [`ConnectorState`] maintains the consuming splits' info. In specific split readers,
945/// `ConnectorState` cannot be [`None`] and contains one(for mq split readers) or many(for fs
946/// split readers) [`SplitImpl`]. If no split is assigned to source executor, `ConnectorState` is
947/// [`None`] and the created source stream will be a pending stream.
948pub type ConnectorState = Option<Vec<SplitImpl>>;
949
950#[cfg(test)]
951mod tests {
952    use maplit::*;
953    use nexmark::event::EventType;
954
955    use super::*;
956    use crate::source::cdc::{DebeziumCdcSplit, Mysql};
957    use crate::source::kafka::KafkaSplit;
958
959    #[test]
960    fn test_split_impl_get_fn() -> Result<()> {
961        let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
962        let split_impl = SplitImpl::Kafka(split.clone());
963        let get_value = split_impl.into_kafka().unwrap();
964        println!("{:?}", get_value);
965        assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
966        assert_eq!(split.encode_to_json(), get_value.encode_to_json());
967
968        Ok(())
969    }
970
971    #[test]
972    fn test_cdc_split_state() -> Result<()> {
973        let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
974        let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
975        let split_impl = SplitImpl::MysqlCdc(split);
976        let encoded_split = split_impl.encode_to_bytes();
977        let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
978        assert_eq!(
979            split_impl.encode_to_bytes(),
980            restored_split_impl.encode_to_bytes()
981        );
982        assert_eq!(
983            split_impl.encode_to_json(),
984            restored_split_impl.encode_to_json()
985        );
986
987        let encoded_split = split_impl.encode_to_json();
988        let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
989        assert_eq!(
990            split_impl.encode_to_bytes(),
991            restored_split_impl.encode_to_bytes()
992        );
993        assert_eq!(
994            split_impl.encode_to_json(),
995            restored_split_impl.encode_to_json()
996        );
997        Ok(())
998    }
999
1000    #[test]
1001    fn test_extract_nexmark_config() {
1002        let props = convert_args!(btreemap!(
1003            "connector" => "nexmark",
1004            "nexmark.table.type" => "Person",
1005            "nexmark.split.num" => "1",
1006        ));
1007
1008        let props =
1009            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1010                .unwrap();
1011
1012        if let ConnectorProperties::Nexmark(props) = props {
1013            assert_eq!(props.table_type, Some(EventType::Person));
1014            assert_eq!(props.split_num, 1);
1015        } else {
1016            panic!("extract nexmark config failed");
1017        }
1018    }
1019
1020    #[test]
1021    fn test_extract_kafka_config() {
1022        let props = convert_args!(btreemap!(
1023            "connector" => "kafka",
1024            "properties.bootstrap.server" => "b1,b2",
1025            "topic" => "test",
1026            "scan.startup.mode" => "earliest",
1027            "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
1028        ));
1029
1030        let props =
1031            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1032                .unwrap();
1033        if let ConnectorProperties::Kafka(k) = props {
1034            let btreemap = btreemap! {
1035                "b-1:9092".to_owned() => "dns-1".to_owned(),
1036                "b-2:9092".to_owned() => "dns-2".to_owned(),
1037            };
1038            assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
1039        } else {
1040            panic!("extract kafka config failed");
1041        }
1042    }
1043
1044    #[test]
1045    fn test_extract_cdc_properties() {
1046        let user_props_mysql = convert_args!(btreemap!(
1047            "connector" => "mysql-cdc",
1048            "database.hostname" => "127.0.0.1",
1049            "database.port" => "3306",
1050            "database.user" => "root",
1051            "database.password" => "123456",
1052            "database.name" => "mydb",
1053            "table.name" => "products",
1054        ));
1055
1056        let user_props_postgres = convert_args!(btreemap!(
1057            "connector" => "postgres-cdc",
1058            "database.hostname" => "127.0.0.1",
1059            "database.port" => "5432",
1060            "database.user" => "root",
1061            "database.password" => "654321",
1062            "schema.name" => "public",
1063            "database.name" => "mypgdb",
1064            "table.name" => "orders",
1065        ));
1066
1067        let conn_props = ConnectorProperties::extract(
1068            WithOptionsSecResolved::without_secrets(user_props_mysql),
1069            true,
1070        )
1071        .unwrap();
1072        if let ConnectorProperties::MysqlCdc(c) = conn_props {
1073            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1074            assert_eq!(c.properties.get("database.port").unwrap(), "3306");
1075            assert_eq!(c.properties.get("database.user").unwrap(), "root");
1076            assert_eq!(c.properties.get("database.password").unwrap(), "123456");
1077            assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
1078            assert_eq!(c.properties.get("table.name").unwrap(), "products");
1079        } else {
1080            panic!("extract cdc config failed");
1081        }
1082
1083        let conn_props = ConnectorProperties::extract(
1084            WithOptionsSecResolved::without_secrets(user_props_postgres),
1085            true,
1086        )
1087        .unwrap();
1088        if let ConnectorProperties::PostgresCdc(c) = conn_props {
1089            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1090            assert_eq!(c.properties.get("database.port").unwrap(), "5432");
1091            assert_eq!(c.properties.get("database.user").unwrap(), "root");
1092            assert_eq!(c.properties.get("database.password").unwrap(), "654321");
1093            assert_eq!(c.properties.get("schema.name").unwrap(), "public");
1094            assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1095            assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1096        } else {
1097            panic!("extract cdc config failed");
1098        }
1099    }
1100}