risingwave_connector/source/
base.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::catalog::TableId;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use crate::enforce_secret::EnforceSecret;
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::batch::BatchSourceSplitImpl;
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57    WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58    for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59    match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68/// Callback wrapper for reporting CDC auto schema change fail events
69/// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
70#[derive(Clone)]
71pub struct CdcAutoSchemaChangeFailCallback(
72    Arc<dyn Fn(u32, String, String, String, String) + Send + Sync>,
73);
74
75impl CdcAutoSchemaChangeFailCallback {
76    pub fn new<F>(f: F) -> Self
77    where
78        F: Fn(u32, String, String, String, String) + Send + Sync + 'static,
79    {
80        Self(Arc::new(f))
81    }
82
83    pub fn call(
84        &self,
85        table_id: u32,
86        table_name: String,
87        cdc_table_id: String,
88        upstream_ddl: String,
89        fail_info: String,
90    ) {
91        self.0(table_id, table_name, cdc_table_id, upstream_ddl, fail_info);
92    }
93}
94
95impl std::fmt::Debug for CdcAutoSchemaChangeFailCallback {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.write_str("CdcAutoSchemaChangeFailCallback")
98    }
99}
100pub trait TryFromBTreeMap: Sized + UnknownFields {
101    /// Used to initialize the source properties from the raw untyped `WITH` options.
102    fn try_from_btreemap(
103        props: BTreeMap<String, String>,
104        deny_unknown_fields: bool,
105    ) -> Result<Self>;
106}
107
108/// Represents `WITH` options for sources.
109///
110/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
111pub trait SourceProperties:
112    TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
113{
114    const SOURCE_NAME: &'static str;
115    type Split: SplitMetaData
116        + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
117        + Into<SplitImpl>;
118    type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
119    type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
120
121    /// Load additional info from `PbSource`. Currently only used by CDC.
122    fn init_from_pb_source(&mut self, _source: &PbSource) {}
123
124    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
125    fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
126}
127
128pub trait UnknownFields {
129    /// Unrecognized fields in the `WITH` clause.
130    fn unknown_fields(&self) -> HashMap<String, String>;
131}
132
133impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
134    fn try_from_btreemap(
135        props: BTreeMap<String, String>,
136        deny_unknown_fields: bool,
137    ) -> Result<Self> {
138        let json_value = serde_json::to_value(props)?;
139        let res = serde_json::from_value::<P>(json_value)?;
140
141        if !deny_unknown_fields || res.unknown_fields().is_empty() {
142            Ok(res)
143        } else {
144            bail!(
145                "Unknown fields in the WITH clause: {:?}",
146                res.unknown_fields()
147            )
148        }
149    }
150}
151
152#[derive(Default)]
153pub struct CreateSplitReaderOpt {
154    pub support_multiple_splits: bool,
155    pub seek_to_latest: bool,
156}
157
158#[derive(Default)]
159pub struct CreateSplitReaderResult {
160    pub latest_splits: Option<Vec<SplitImpl>>,
161    pub backfill_info: HashMap<SplitId, BackfillInfo>,
162}
163
164pub async fn create_split_readers<P: SourceProperties>(
165    prop: P,
166    splits: Vec<SplitImpl>,
167    parser_config: ParserConfig,
168    source_ctx: SourceContextRef,
169    columns: Option<Vec<Column>>,
170    opt: CreateSplitReaderOpt,
171) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
172    let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
173    let mut res = CreateSplitReaderResult {
174        backfill_info: HashMap::new(),
175        latest_splits: None,
176    };
177    if opt.support_multiple_splits {
178        let mut reader = P::SplitReader::new(
179            prop.clone(),
180            splits,
181            parser_config.clone(),
182            source_ctx.clone(),
183            columns.clone(),
184        )
185        .await?;
186        if opt.seek_to_latest {
187            res.latest_splits = Some(reader.seek_to_latest().await?);
188        }
189        res.backfill_info = reader.backfill_info();
190        Ok((reader.into_stream().boxed(), res))
191    } else {
192        let mut readers = try_join_all(splits.into_iter().map(|split| {
193            // TODO: is this reader split across multiple threads...? Realistically, we want
194            // source_ctx to live in a single actor.
195            P::SplitReader::new(
196                prop.clone(),
197                vec![split],
198                parser_config.clone(),
199                source_ctx.clone(),
200                columns.clone(),
201            )
202        }))
203        .await?;
204        if opt.seek_to_latest {
205            let mut latest_splits = vec![];
206            for reader in &mut readers {
207                latest_splits.extend(reader.seek_to_latest().await?);
208            }
209            res.latest_splits = Some(latest_splits);
210        }
211        res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
212        Ok((
213            select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
214            res,
215        ))
216    }
217}
218
219/// [`SplitEnumerator`] fetches the split metadata from the external source service.
220/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
221#[async_trait]
222pub trait SplitEnumerator: Sized + Send {
223    type Split: SplitMetaData + Send;
224    type Properties;
225
226    async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
227    -> Result<Self>;
228    async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
229    /// Do some cleanup work when a fragment is dropped, e.g., drop Kafka consumer group.
230    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
231        Ok(())
232    }
233    /// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
234    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
235        Ok(())
236    }
237    /// Called after `worker.tick()` execution to perform periodic operations,
238    /// such as monitoring upstream PostgreSQL `confirmed_flush_lsn`, etc.
239    /// This can be extended to support more periodic operations in the future.
240    async fn on_tick(&mut self) -> Result<()> {
241        Ok(())
242    }
243}
244
245pub type SourceContextRef = Arc<SourceContext>;
246pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
247
248/// Dyn-compatible [`SplitEnumerator`].
249#[async_trait]
250pub trait AnySplitEnumerator: Send {
251    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
252    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
253    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
254    async fn on_tick(&mut self) -> Result<()>;
255}
256
257#[async_trait]
258impl<T: SplitEnumerator<Split: Into<SplitImpl>> + 'static> AnySplitEnumerator for T {
259    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
260        SplitEnumerator::list_splits(self)
261            .await
262            .map(|s| s.into_iter().map(|s| s.into()).collect())
263    }
264
265    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
266        SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
267    }
268
269    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
270        SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
271    }
272
273    async fn on_tick(&mut self) -> Result<()> {
274        SplitEnumerator::on_tick(self).await
275    }
276}
277
278/// The max size of a chunk yielded by source stream.
279pub const MAX_CHUNK_SIZE: usize = 1024;
280
281#[derive(Debug, Clone, Copy)]
282pub struct SourceCtrlOpts {
283    /// The max size of a chunk yielded by source stream.
284    pub chunk_size: usize,
285    /// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`.
286    pub split_txn: bool,
287}
288
289// The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it,
290// so that we can prevent any unintentional use of the default value.
291impl !Default for SourceCtrlOpts {}
292
293impl SourceCtrlOpts {
294    #[cfg(test)]
295    pub fn for_test() -> Self {
296        SourceCtrlOpts {
297            chunk_size: 256,
298            split_txn: false,
299        }
300    }
301}
302
303#[derive(Debug)]
304pub struct SourceEnumeratorContext {
305    pub info: SourceEnumeratorInfo,
306    pub metrics: Arc<EnumeratorMetrics>,
307}
308
309impl SourceEnumeratorContext {
310    /// Create a dummy `SourceEnumeratorContext` for testing purpose, or for the situation
311    /// where the real context doesn't matter.
312    pub fn dummy() -> SourceEnumeratorContext {
313        SourceEnumeratorContext {
314            info: SourceEnumeratorInfo { source_id: 0 },
315            metrics: Arc::new(EnumeratorMetrics::default()),
316        }
317    }
318}
319
320#[derive(Clone, Debug)]
321pub struct SourceEnumeratorInfo {
322    pub source_id: u32,
323}
324
325#[derive(Clone, Debug)]
326pub struct SourceContext {
327    pub actor_id: u32,
328    pub source_id: TableId,
329    pub fragment_id: u32,
330    pub source_name: String,
331    pub metrics: Arc<SourceMetrics>,
332    pub source_ctrl_opts: SourceCtrlOpts,
333    pub connector_props: ConnectorProperties,
334    // source parser put schema change event into this channel
335    pub schema_change_tx:
336        Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
337    // callback function to report CDC auto schema change fail events
338    pub on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
339}
340
341impl SourceContext {
342    pub fn new(
343        actor_id: u32,
344        source_id: TableId,
345        fragment_id: u32,
346        source_name: String,
347        metrics: Arc<SourceMetrics>,
348        source_ctrl_opts: SourceCtrlOpts,
349        connector_props: ConnectorProperties,
350        schema_change_channel: Option<
351            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
352        >,
353    ) -> Self {
354        Self::new_with_auto_schema_change_callback(
355            actor_id,
356            source_id,
357            fragment_id,
358            source_name,
359            metrics,
360            source_ctrl_opts,
361            connector_props,
362            schema_change_channel,
363            None,
364        )
365    }
366
367    pub fn new_with_auto_schema_change_callback(
368        actor_id: u32,
369        source_id: TableId,
370        fragment_id: u32,
371        source_name: String,
372        metrics: Arc<SourceMetrics>,
373        source_ctrl_opts: SourceCtrlOpts,
374        connector_props: ConnectorProperties,
375        schema_change_channel: Option<
376            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
377        >,
378        on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
379    ) -> Self {
380        Self {
381            actor_id,
382            source_id,
383            fragment_id,
384            source_name,
385            metrics,
386            source_ctrl_opts,
387            connector_props,
388            schema_change_tx: schema_change_channel,
389            on_cdc_auto_schema_change_failure,
390        }
391    }
392
393    /// Create a dummy `SourceContext` for testing purpose, or for the situation
394    /// where the real context doesn't matter.
395    pub fn dummy() -> Self {
396        Self::new(
397            0,
398            TableId::new(0),
399            0,
400            "dummy".to_owned(),
401            Arc::new(SourceMetrics::default()),
402            SourceCtrlOpts {
403                chunk_size: MAX_CHUNK_SIZE,
404                split_txn: false,
405            },
406            ConnectorProperties::default(),
407            None,
408        )
409    }
410
411    /// Report CDC auto schema change fail event
412    /// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
413    pub fn on_cdc_auto_schema_change_failure(
414        &self,
415        table_id: u32,
416        table_name: String,
417        cdc_table_id: String,
418        upstream_ddl: String,
419        fail_info: String,
420    ) {
421        if let Some(ref cdc_auto_schema_change_fail_callback) =
422            self.on_cdc_auto_schema_change_failure
423        {
424            cdc_auto_schema_change_fail_callback.call(
425                table_id,
426                table_name,
427                cdc_table_id,
428                upstream_ddl,
429                fail_info,
430            );
431        }
432    }
433}
434
435#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
436pub enum SourceFormat {
437    #[default]
438    Invalid,
439    Native,
440    None,
441    Debezium,
442    DebeziumMongo,
443    Maxwell,
444    Canal,
445    Upsert,
446    Plain,
447}
448
449/// Refer to [`crate::parser::EncodingProperties`]
450#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
451pub enum SourceEncode {
452    #[default]
453    Invalid,
454    Native,
455    None,
456    Avro,
457    Csv,
458    Protobuf,
459    Json,
460    Bytes,
461    Parquet,
462}
463
464#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
465pub struct SourceStruct {
466    pub format: SourceFormat,
467    pub encode: SourceEncode,
468}
469
470impl SourceStruct {
471    pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
472        Self { format, encode }
473    }
474}
475
476// Only return valid (format, encode)
477pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
478    use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
479
480    // old version meta.
481    if let Ok(format) = info.get_row_format() {
482        let (format, encode) = match format {
483            RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
484            RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
485            RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
486            RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
487            RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
488            RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
489            RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
490            RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
491            RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
492            RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
493            RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
494            RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
495            RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
496            RowFormatType::RowUnspecified => unreachable!(),
497        };
498        return Ok(SourceStruct::new(format, encode));
499    }
500    let source_format = info.get_format()?;
501    let source_encode = info.get_row_encode()?;
502    let (format, encode) = match (source_format, source_encode) {
503        (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
504        (PbFormatType::Plain, PbEncodeType::Protobuf) => {
505            (SourceFormat::Plain, SourceEncode::Protobuf)
506        }
507        (PbFormatType::Debezium, PbEncodeType::Json) => {
508            (SourceFormat::Debezium, SourceEncode::Json)
509        }
510        (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
511        (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
512        (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
513        (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
514        (PbFormatType::Plain, PbEncodeType::Parquet) => {
515            (SourceFormat::Plain, SourceEncode::Parquet)
516        }
517        (PbFormatType::Native, PbEncodeType::Native) => {
518            (SourceFormat::Native, SourceEncode::Native)
519        }
520        (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
521        (PbFormatType::Debezium, PbEncodeType::Avro) => {
522            (SourceFormat::Debezium, SourceEncode::Avro)
523        }
524        (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
525        (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
526        (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
527            (SourceFormat::DebeziumMongo, SourceEncode::Json)
528        }
529        (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
530        (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
531            (SourceFormat::Upsert, SourceEncode::Protobuf)
532        }
533        (format, encode) => {
534            bail!(
535                "Unsupported combination of format {:?} and encode {:?}",
536                format,
537                encode
538            );
539        }
540    };
541    Ok(SourceStruct::new(format, encode))
542}
543
544/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch.
545pub type BoxSourceMessageStream =
546    BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
547/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
548pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
549/// `StreamChunk` with the latest split state.
550/// The state is constructed in `StreamReaderBuilder::into_retry_stream`
551pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
552/// See [`StreamChunkWithState`].
553pub type BoxSourceChunkWithStateStream =
554    BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
555
556/// Stream of [`Option<StreamChunk>`]s parsed from the messages from the external source.
557pub type BoxStreamingFileSourceChunkStream =
558    BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
559
560// Manually expand the trait alias to improve IDE experience.
561pub trait SourceChunkStream:
562    Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
563{
564}
565impl<T> SourceChunkStream for T where
566    T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
567{
568}
569
570pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
571
572/// [`SplitReader`] is a new abstraction of the external connector read interface which is
573/// responsible for parsing, it is used to read messages from the outside and transform them into a
574/// stream of parsed [`StreamChunk`]
575#[async_trait]
576pub trait SplitReader: Sized + Send {
577    type Properties;
578    type Split: SplitMetaData;
579
580    async fn new(
581        properties: Self::Properties,
582        state: Vec<Self::Split>,
583        parser_config: ParserConfig,
584        source_ctx: SourceContextRef,
585        columns: Option<Vec<Column>>,
586    ) -> crate::error::ConnectorResult<Self>;
587
588    fn into_stream(self) -> BoxSourceChunkStream;
589
590    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
591        HashMap::new()
592    }
593
594    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
595        Err(anyhow!("seek_to_latest is not supported for this connector").into())
596    }
597}
598
599/// Information used to determine whether we should start and finish source backfill.
600///
601/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
602/// perhaps we should ban blocking DDL for it.
603#[derive(Debug, Clone)]
604pub enum BackfillInfo {
605    HasDataToBackfill {
606        /// The last available offsets for each split (**inclusive**).
607        ///
608        /// This will be used to determine whether source backfill is finished when
609        /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
610        /// blocking DDL cannot finish until new messages come.
611        ///
612        /// When there are upstream messages, we will use the latest offsets from the upstream.
613        latest_offset: String,
614    },
615    /// If there are no messages in the split at all, we don't need to start backfill.
616    /// In this case, there will be no message from the backfill stream too.
617    /// If we started backfill, we cannot finish it until new messages come.
618    /// So we mark this a special case for optimization.
619    NoDataToBackfill,
620}
621
622for_all_sources!(impl_connector_properties);
623
624impl Default for ConnectorProperties {
625    fn default() -> Self {
626        ConnectorProperties::Test(Box::default())
627    }
628}
629
630impl ConnectorProperties {
631    /// Creates typed source properties from the raw `WITH` properties.
632    ///
633    /// It checks the `connector` field, and them dispatches to the corresponding type's `try_from_btreemap` method.
634    ///
635    /// `deny_unknown_fields`: Since `WITH` options are persisted in meta, we do not deny unknown fields when restoring from
636    /// existing data to avoid breaking backwards compatibility. We only deny unknown fields when creating new sources.
637    pub fn extract(
638        with_properties: WithOptionsSecResolved,
639        deny_unknown_fields: bool,
640    ) -> Result<Self> {
641        let (options, secret_refs) = with_properties.into_parts();
642        let mut options_with_secret =
643            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
644        let connector = options_with_secret
645            .remove(UPSTREAM_SOURCE_KEY)
646            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
647            .to_lowercase();
648        match_source_name_str!(
649            connector.as_str(),
650            PropType,
651            PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
652                .map(ConnectorProperties::from),
653            |other| bail!("connector '{}' is not supported", other)
654        )
655    }
656
657    pub fn enforce_secret_source(
658        with_properties: &impl WithPropertiesExt,
659    ) -> crate::error::ConnectorResult<()> {
660        let connector = with_properties
661            .get_connector()
662            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
663            .to_lowercase();
664        let key_iter = with_properties.key_iter();
665        match_source_name_str!(
666            connector.as_str(),
667            PropType,
668            PropType::enforce_secret(key_iter),
669            |other| bail!("connector '{}' is not supported", other)
670        )
671    }
672
673    pub fn enable_drop_split(&self) -> bool {
674        // enable split scale in just for Kinesis
675        matches!(
676            self,
677            ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
678        )
679    }
680
681    /// For most connectors, this should be false. When enabled, RisingWave should not track any progress.
682    pub fn enable_adaptive_splits(&self) -> bool {
683        matches!(self, ConnectorProperties::Nats(_))
684    }
685
686    /// Load additional info from `PbSource`. Currently only used by CDC.
687    pub fn init_from_pb_source(&mut self, source: &PbSource) {
688        dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
689    }
690
691    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
692    pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
693        dispatch_source_prop!(self, |prop| prop
694            .init_from_pb_cdc_table_desc(cdc_table_desc))
695    }
696
697    pub fn support_multiple_splits(&self) -> bool {
698        matches!(self, ConnectorProperties::Kafka(_))
699            || matches!(self, ConnectorProperties::OpendalS3(_))
700            || matches!(self, ConnectorProperties::Gcs(_))
701            || matches!(self, ConnectorProperties::Azblob(_))
702    }
703
704    pub async fn create_split_enumerator(
705        self,
706        context: crate::source::base::SourceEnumeratorContextRef,
707    ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
708        let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
709            <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
710        ));
711        Ok(enumerator)
712    }
713
714    pub async fn create_split_reader(
715        self,
716        splits: Vec<SplitImpl>,
717        parser_config: ParserConfig,
718        source_ctx: SourceContextRef,
719        columns: Option<Vec<Column>>,
720        mut opt: crate::source::CreateSplitReaderOpt,
721    ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
722        opt.support_multiple_splits = self.support_multiple_splits();
723        tracing::debug!(
724            ?splits,
725            support_multiple_splits = opt.support_multiple_splits,
726            "spawning connector split reader",
727        );
728
729        dispatch_source_prop!(self, |prop| create_split_readers(
730            *prop,
731            splits,
732            parser_config,
733            source_ctx,
734            columns,
735            opt
736        )
737        .await)
738    }
739}
740
741for_all_sources!(impl_split);
742for_all_connections!(impl_connection);
743
744impl From<&SplitImpl> for ConnectorSplit {
745    fn from(split: &SplitImpl) -> Self {
746        dispatch_split_impl!(split, |inner| {
747            ConnectorSplit {
748                split_type: String::from(PropType::SOURCE_NAME),
749                encoded_split: inner.encode_to_bytes().to_vec(),
750            }
751        })
752    }
753}
754
755impl TryFrom<&ConnectorSplit> for SplitImpl {
756    type Error = crate::error::ConnectorError;
757
758    fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
759        let split_type = split.split_type.to_lowercase();
760        match_source_name_str!(
761            split_type.as_str(),
762            PropType,
763            {
764                <PropType as SourceProperties>::Split::restore_from_bytes(
765                    split.encoded_split.as_ref(),
766                )
767                .map(Into::into)
768            },
769            |other| bail!("connector '{}' is not supported", other)
770        )
771    }
772}
773
774impl SplitImpl {
775    fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
776        let split_type = split_type.to_lowercase();
777        match_source_name_str!(
778            split_type.as_str(),
779            PropType,
780            <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
781            |other| bail!("connector '{}' is not supported", other)
782        )
783    }
784
785    pub fn is_cdc_split(&self) -> bool {
786        matches!(
787            self,
788            MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
789        )
790    }
791
792    /// Get the current split offset.
793    pub fn get_cdc_split_offset(&self) -> String {
794        match self {
795            MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
796            PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
797            MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
798            CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
799            SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
800            _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
801        }
802    }
803
804    pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
805        match self {
806            SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
807                Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
808            }
809            _ => None,
810        }
811    }
812}
813
814impl SplitMetaData for SplitImpl {
815    fn id(&self) -> SplitId {
816        dispatch_split_impl!(self, |inner| inner.id())
817    }
818
819    fn encode_to_json(&self) -> JsonbVal {
820        use serde_json::json;
821        let inner = self.encode_to_json_inner().take();
822        json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
823    }
824
825    fn restore_from_json(value: JsonbVal) -> Result<Self> {
826        let mut value = value.take();
827        let json_obj = value.as_object_mut().unwrap();
828        let split_type = json_obj
829            .remove(SPLIT_TYPE_FIELD)
830            .unwrap()
831            .as_str()
832            .unwrap()
833            .to_owned();
834        let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
835        Self::restore_from_json_inner(&split_type, inner_value.into())
836    }
837
838    fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
839        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
840    }
841}
842
843impl SplitImpl {
844    pub fn get_type(&self) -> String {
845        dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
846    }
847
848    pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
849        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
850        Ok(())
851    }
852
853    pub fn encode_to_json_inner(&self) -> JsonbVal {
854        dispatch_split_impl!(self, |inner| inner.encode_to_json())
855    }
856}
857
858use risingwave_common::types::DataType;
859
860#[derive(Clone, Debug)]
861pub struct Column {
862    pub name: String,
863    pub data_type: DataType,
864    /// This field is only used by datagen.
865    pub is_visible: bool,
866}
867
868/// Split id resides in every source message, use `Arc` to avoid copying.
869pub type SplitId = Arc<str>;
870
871/// The message pumped from the external source service.
872/// The third-party message structs will eventually be transformed into this struct.
873#[derive(Debug, Clone)]
874pub struct SourceMessage {
875    pub key: Option<Vec<u8>>,
876    pub payload: Option<Vec<u8>>,
877    pub offset: String, // TODO: use `Arc<str>`
878    pub split_id: SplitId,
879    pub meta: SourceMeta,
880}
881
882impl SourceMessage {
883    /// Create a dummy `SourceMessage` with all fields unset for testing purposes.
884    pub fn dummy() -> Self {
885        Self {
886            key: None,
887            payload: None,
888            offset: "".to_owned(),
889            split_id: "".into(),
890            meta: SourceMeta::Empty,
891        }
892    }
893
894    /// Check whether the source message is a CDC heartbeat message.
895    pub fn is_cdc_heartbeat(&self) -> bool {
896        self.key.is_none() && self.payload.is_none()
897    }
898}
899
900#[derive(Debug, Clone)]
901pub enum SourceMeta {
902    Kafka(KafkaMeta),
903    Kinesis(KinesisMeta),
904    Pulsar(PulsarMeta),
905    Nexmark(NexmarkMeta),
906    GooglePubsub(GooglePubsubMeta),
907    Datagen(DatagenMeta),
908    DebeziumCdc(DebeziumCdcMeta),
909    Nats(NatsMeta),
910    // For the source that doesn't have meta data.
911    Empty,
912}
913
914/// Implement Eq manually to ignore the `meta` field.
915impl PartialEq for SourceMessage {
916    fn eq(&self, other: &Self) -> bool {
917        self.offset == other.offset
918            && self.split_id == other.split_id
919            && self.payload == other.payload
920    }
921}
922impl Eq for SourceMessage {}
923
924/// The metadata of a split.
925pub trait SplitMetaData: Sized {
926    fn id(&self) -> SplitId;
927    fn encode_to_bytes(&self) -> Bytes {
928        self.encode_to_json()
929            .as_scalar_ref()
930            .value_serialize()
931            .into()
932    }
933    fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
934        Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
935    }
936
937    /// Encode the whole split metadata to a JSON object
938    fn encode_to_json(&self) -> JsonbVal;
939    fn restore_from_json(value: JsonbVal) -> Result<Self>;
940    fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
941}
942
943/// [`ConnectorState`] maintains the consuming splits' info. In specific split readers,
944/// `ConnectorState` cannot be [`None`] and contains one(for mq split readers) or many(for fs
945/// split readers) [`SplitImpl`]. If no split is assigned to source executor, `ConnectorState` is
946/// [`None`] and the created source stream will be a pending stream.
947pub type ConnectorState = Option<Vec<SplitImpl>>;
948
949#[cfg(test)]
950mod tests {
951    use maplit::*;
952    use nexmark::event::EventType;
953
954    use super::*;
955    use crate::source::cdc::{DebeziumCdcSplit, Mysql};
956    use crate::source::kafka::KafkaSplit;
957
958    #[test]
959    fn test_split_impl_get_fn() -> Result<()> {
960        let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
961        let split_impl = SplitImpl::Kafka(split.clone());
962        let get_value = split_impl.into_kafka().unwrap();
963        println!("{:?}", get_value);
964        assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
965        assert_eq!(split.encode_to_json(), get_value.encode_to_json());
966
967        Ok(())
968    }
969
970    #[test]
971    fn test_cdc_split_state() -> Result<()> {
972        let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
973        let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
974        let split_impl = SplitImpl::MysqlCdc(split);
975        let encoded_split = split_impl.encode_to_bytes();
976        let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
977        assert_eq!(
978            split_impl.encode_to_bytes(),
979            restored_split_impl.encode_to_bytes()
980        );
981        assert_eq!(
982            split_impl.encode_to_json(),
983            restored_split_impl.encode_to_json()
984        );
985
986        let encoded_split = split_impl.encode_to_json();
987        let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
988        assert_eq!(
989            split_impl.encode_to_bytes(),
990            restored_split_impl.encode_to_bytes()
991        );
992        assert_eq!(
993            split_impl.encode_to_json(),
994            restored_split_impl.encode_to_json()
995        );
996        Ok(())
997    }
998
999    #[test]
1000    fn test_extract_nexmark_config() {
1001        let props = convert_args!(btreemap!(
1002            "connector" => "nexmark",
1003            "nexmark.table.type" => "Person",
1004            "nexmark.split.num" => "1",
1005        ));
1006
1007        let props =
1008            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1009                .unwrap();
1010
1011        if let ConnectorProperties::Nexmark(props) = props {
1012            assert_eq!(props.table_type, Some(EventType::Person));
1013            assert_eq!(props.split_num, 1);
1014        } else {
1015            panic!("extract nexmark config failed");
1016        }
1017    }
1018
1019    #[test]
1020    fn test_extract_kafka_config() {
1021        let props = convert_args!(btreemap!(
1022            "connector" => "kafka",
1023            "properties.bootstrap.server" => "b1,b2",
1024            "topic" => "test",
1025            "scan.startup.mode" => "earliest",
1026            "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
1027        ));
1028
1029        let props =
1030            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1031                .unwrap();
1032        if let ConnectorProperties::Kafka(k) = props {
1033            let btreemap = btreemap! {
1034                "b-1:9092".to_owned() => "dns-1".to_owned(),
1035                "b-2:9092".to_owned() => "dns-2".to_owned(),
1036            };
1037            assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
1038        } else {
1039            panic!("extract kafka config failed");
1040        }
1041    }
1042
1043    #[test]
1044    fn test_extract_cdc_properties() {
1045        let user_props_mysql = convert_args!(btreemap!(
1046            "connector" => "mysql-cdc",
1047            "database.hostname" => "127.0.0.1",
1048            "database.port" => "3306",
1049            "database.user" => "root",
1050            "database.password" => "123456",
1051            "database.name" => "mydb",
1052            "table.name" => "products",
1053        ));
1054
1055        let user_props_postgres = convert_args!(btreemap!(
1056            "connector" => "postgres-cdc",
1057            "database.hostname" => "127.0.0.1",
1058            "database.port" => "5432",
1059            "database.user" => "root",
1060            "database.password" => "654321",
1061            "schema.name" => "public",
1062            "database.name" => "mypgdb",
1063            "table.name" => "orders",
1064        ));
1065
1066        let conn_props = ConnectorProperties::extract(
1067            WithOptionsSecResolved::without_secrets(user_props_mysql),
1068            true,
1069        )
1070        .unwrap();
1071        if let ConnectorProperties::MysqlCdc(c) = conn_props {
1072            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1073            assert_eq!(c.properties.get("database.port").unwrap(), "3306");
1074            assert_eq!(c.properties.get("database.user").unwrap(), "root");
1075            assert_eq!(c.properties.get("database.password").unwrap(), "123456");
1076            assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
1077            assert_eq!(c.properties.get("table.name").unwrap(), "products");
1078        } else {
1079            panic!("extract cdc config failed");
1080        }
1081
1082        let conn_props = ConnectorProperties::extract(
1083            WithOptionsSecResolved::without_secrets(user_props_postgres),
1084            true,
1085        )
1086        .unwrap();
1087        if let ConnectorProperties::PostgresCdc(c) = conn_props {
1088            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1089            assert_eq!(c.properties.get("database.port").unwrap(), "5432");
1090            assert_eq!(c.properties.get("database.user").unwrap(), "root");
1091            assert_eq!(c.properties.get("database.password").unwrap(), "654321");
1092            assert_eq!(c.properties.get("schema.name").unwrap(), "public");
1093            assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1094            assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1095        } else {
1096            panic!("extract cdc config failed");
1097        }
1098    }
1099}