risingwave_connector/source/
base.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::id::{ActorId, FragmentId, SourceId};
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use crate::enforce_secret::EnforceSecret;
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::batch::BatchSourceSplitImpl;
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57    WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58    for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59    match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68/// Callback wrapper for reporting CDC auto schema change fail events
69/// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
70#[derive(Clone)]
71pub struct CdcAutoSchemaChangeFailCallback(
72    Arc<dyn Fn(SourceId, String, String, String, String) + Send + Sync>,
73);
74
75impl CdcAutoSchemaChangeFailCallback {
76    pub fn new<F>(f: F) -> Self
77    where
78        F: Fn(SourceId, String, String, String, String) + Send + Sync + 'static,
79    {
80        Self(Arc::new(f))
81    }
82
83    pub fn call(
84        &self,
85        source_id: SourceId,
86        table_name: String,
87        cdc_table_id: String,
88        upstream_ddl: String,
89        fail_info: String,
90    ) {
91        self.0(source_id, table_name, cdc_table_id, upstream_ddl, fail_info);
92    }
93}
94
95impl std::fmt::Debug for CdcAutoSchemaChangeFailCallback {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.write_str("CdcAutoSchemaChangeFailCallback")
98    }
99}
100pub trait TryFromBTreeMap: Sized + UnknownFields {
101    /// Used to initialize the source properties from the raw untyped `WITH` options.
102    fn try_from_btreemap(
103        props: BTreeMap<String, String>,
104        deny_unknown_fields: bool,
105    ) -> Result<Self>;
106}
107
108/// Represents `WITH` options for sources.
109///
110/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
111pub trait SourceProperties:
112    TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
113{
114    const SOURCE_NAME: &'static str;
115    type Split: SplitMetaData
116        + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
117        + Into<SplitImpl>;
118    type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
119    type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
120
121    /// Load additional info from `PbSource`. Currently only used by CDC.
122    fn init_from_pb_source(&mut self, _source: &PbSource) {}
123
124    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
125    fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
126}
127
128pub trait UnknownFields {
129    /// Unrecognized fields in the `WITH` clause.
130    fn unknown_fields(&self) -> HashMap<String, String>;
131}
132
133impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
134    fn try_from_btreemap(
135        props: BTreeMap<String, String>,
136        deny_unknown_fields: bool,
137    ) -> Result<Self> {
138        let json_value = serde_json::to_value(props)?;
139        let res = serde_json::from_value::<P>(json_value)?;
140
141        if !deny_unknown_fields || res.unknown_fields().is_empty() {
142            Ok(res)
143        } else {
144            bail!(
145                "Unknown fields in the WITH clause: {:?}",
146                res.unknown_fields()
147            )
148        }
149    }
150}
151
152#[derive(Default)]
153pub struct CreateSplitReaderOpt {
154    pub support_multiple_splits: bool,
155    pub seek_to_latest: bool,
156}
157
158#[derive(Default)]
159pub struct CreateSplitReaderResult {
160    pub latest_splits: Option<Vec<SplitImpl>>,
161    pub backfill_info: HashMap<SplitId, BackfillInfo>,
162}
163
164pub async fn create_split_readers<P: SourceProperties>(
165    prop: P,
166    splits: Vec<SplitImpl>,
167    parser_config: ParserConfig,
168    source_ctx: SourceContextRef,
169    columns: Option<Vec<Column>>,
170    opt: CreateSplitReaderOpt,
171) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
172    let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
173    let mut res = CreateSplitReaderResult {
174        backfill_info: HashMap::new(),
175        latest_splits: None,
176    };
177    if opt.support_multiple_splits {
178        let mut reader = P::SplitReader::new(
179            prop.clone(),
180            splits,
181            parser_config.clone(),
182            source_ctx.clone(),
183            columns.clone(),
184        )
185        .await?;
186        if opt.seek_to_latest {
187            res.latest_splits = Some(reader.seek_to_latest().await?);
188        }
189        res.backfill_info = reader.backfill_info();
190        Ok((reader.into_stream().boxed(), res))
191    } else {
192        let mut readers = try_join_all(splits.into_iter().map(|split| {
193            // TODO: is this reader split across multiple threads...? Realistically, we want
194            // source_ctx to live in a single actor.
195            P::SplitReader::new(
196                prop.clone(),
197                vec![split],
198                parser_config.clone(),
199                source_ctx.clone(),
200                columns.clone(),
201            )
202        }))
203        .await?;
204        if opt.seek_to_latest {
205            let mut latest_splits = vec![];
206            for reader in &mut readers {
207                latest_splits.extend(reader.seek_to_latest().await?);
208            }
209            res.latest_splits = Some(latest_splits);
210        }
211        res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
212        Ok((
213            select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
214            res,
215        ))
216    }
217}
218
219/// [`SplitEnumerator`] fetches the split metadata from the external source service.
220/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
221#[async_trait]
222pub trait SplitEnumerator: Sized + Send {
223    type Split: SplitMetaData + Send;
224    type Properties;
225
226    async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
227    -> Result<Self>;
228    async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
229    /// Do some cleanup work when a fragment is dropped, e.g., drop Kafka consumer group.
230    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<FragmentId>) -> Result<()> {
231        Ok(())
232    }
233    /// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
234    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<FragmentId>) -> Result<()> {
235        Ok(())
236    }
237    /// Called after `worker.tick()` execution to perform periodic operations,
238    /// such as monitoring upstream PostgreSQL `confirmed_flush_lsn`, etc.
239    /// This can be extended to support more periodic operations in the future.
240    async fn on_tick(&mut self) -> Result<()> {
241        Ok(())
242    }
243}
244
245pub type SourceContextRef = Arc<SourceContext>;
246pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
247
248/// Dyn-compatible [`SplitEnumerator`].
249#[async_trait]
250pub trait AnySplitEnumerator: Send {
251    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
252    async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()>;
253    async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()>;
254    async fn on_tick(&mut self) -> Result<()>;
255}
256
257#[async_trait]
258impl<T: SplitEnumerator<Split: Into<SplitImpl>> + 'static> AnySplitEnumerator for T {
259    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
260        SplitEnumerator::list_splits(self)
261            .await
262            .map(|s| s.into_iter().map(|s| s.into()).collect())
263    }
264
265    async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()> {
266        SplitEnumerator::on_drop_fragments(self, fragment_ids).await
267    }
268
269    async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()> {
270        SplitEnumerator::on_finish_backfill(self, fragment_ids).await
271    }
272
273    async fn on_tick(&mut self) -> Result<()> {
274        SplitEnumerator::on_tick(self).await
275    }
276}
277
278/// The max size of a chunk yielded by source stream.
279pub const MAX_CHUNK_SIZE: usize = 1024;
280
281#[derive(Debug, Clone, Copy)]
282pub struct SourceCtrlOpts {
283    /// The max size of a chunk yielded by source stream.
284    pub chunk_size: usize,
285    /// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`.
286    pub split_txn: bool,
287}
288
289// The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it,
290// so that we can prevent any unintentional use of the default value.
291impl !Default for SourceCtrlOpts {}
292
293impl SourceCtrlOpts {
294    #[cfg(test)]
295    pub fn for_test() -> Self {
296        SourceCtrlOpts {
297            chunk_size: 256,
298            split_txn: false,
299        }
300    }
301}
302
303#[derive(Debug)]
304pub struct SourceEnumeratorContext {
305    pub info: SourceEnumeratorInfo,
306    pub metrics: Arc<EnumeratorMetrics>,
307}
308
309impl SourceEnumeratorContext {
310    /// Create a dummy `SourceEnumeratorContext` for testing purpose, or for the situation
311    /// where the real context doesn't matter.
312    pub fn dummy() -> SourceEnumeratorContext {
313        SourceEnumeratorContext {
314            info: SourceEnumeratorInfo {
315                source_id: 0.into(),
316            },
317            metrics: Arc::new(EnumeratorMetrics::default()),
318        }
319    }
320}
321
322#[derive(Clone, Debug)]
323pub struct SourceEnumeratorInfo {
324    pub source_id: SourceId,
325}
326
327#[derive(Clone, Debug)]
328pub struct SourceContext {
329    pub actor_id: ActorId,
330    pub source_id: SourceId,
331    pub fragment_id: FragmentId,
332    pub source_name: String,
333    pub metrics: Arc<SourceMetrics>,
334    pub source_ctrl_opts: SourceCtrlOpts,
335    pub connector_props: ConnectorProperties,
336    // source parser put schema change event into this channel
337    pub schema_change_tx:
338        Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
339    // callback function to report CDC auto schema change fail events
340    pub on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
341}
342
343impl SourceContext {
344    pub fn new(
345        actor_id: ActorId,
346        source_id: SourceId,
347        fragment_id: FragmentId,
348        source_name: String,
349        metrics: Arc<SourceMetrics>,
350        source_ctrl_opts: SourceCtrlOpts,
351        connector_props: ConnectorProperties,
352        schema_change_channel: Option<
353            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
354        >,
355    ) -> Self {
356        Self::new_with_auto_schema_change_callback(
357            actor_id,
358            source_id,
359            fragment_id,
360            source_name,
361            metrics,
362            source_ctrl_opts,
363            connector_props,
364            schema_change_channel,
365            None,
366        )
367    }
368
369    pub fn new_with_auto_schema_change_callback(
370        actor_id: ActorId,
371        source_id: SourceId,
372        fragment_id: FragmentId,
373        source_name: String,
374        metrics: Arc<SourceMetrics>,
375        source_ctrl_opts: SourceCtrlOpts,
376        connector_props: ConnectorProperties,
377        schema_change_channel: Option<
378            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
379        >,
380        on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
381    ) -> Self {
382        Self {
383            actor_id,
384            source_id,
385            fragment_id,
386            source_name,
387            metrics,
388            source_ctrl_opts,
389            connector_props,
390            schema_change_tx: schema_change_channel,
391            on_cdc_auto_schema_change_failure,
392        }
393    }
394
395    /// Create a dummy `SourceContext` for testing purpose, or for the situation
396    /// where the real context doesn't matter.
397    pub fn dummy() -> Self {
398        Self::new(
399            0.into(),
400            SourceId::new(0),
401            0.into(),
402            "dummy".to_owned(),
403            Arc::new(SourceMetrics::default()),
404            SourceCtrlOpts {
405                chunk_size: MAX_CHUNK_SIZE,
406                split_txn: false,
407            },
408            ConnectorProperties::default(),
409            None,
410        )
411    }
412
413    /// Report CDC auto schema change fail event
414    /// Parameters: (`source_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
415    pub fn on_cdc_auto_schema_change_failure(
416        &self,
417        source_id: SourceId,
418        table_name: String,
419        cdc_table_id: String,
420        upstream_ddl: String,
421        fail_info: String,
422    ) {
423        if let Some(ref cdc_auto_schema_change_fail_callback) =
424            self.on_cdc_auto_schema_change_failure
425        {
426            cdc_auto_schema_change_fail_callback.call(
427                source_id,
428                table_name,
429                cdc_table_id,
430                upstream_ddl,
431                fail_info,
432            );
433        }
434    }
435}
436
437#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
438pub enum SourceFormat {
439    #[default]
440    Invalid,
441    Native,
442    None,
443    Debezium,
444    DebeziumMongo,
445    Maxwell,
446    Canal,
447    Upsert,
448    Plain,
449}
450
451/// Refer to [`crate::parser::EncodingProperties`]
452#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
453pub enum SourceEncode {
454    #[default]
455    Invalid,
456    Native,
457    None,
458    Avro,
459    Csv,
460    Protobuf,
461    Json,
462    Bytes,
463    Parquet,
464}
465
466#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
467pub struct SourceStruct {
468    pub format: SourceFormat,
469    pub encode: SourceEncode,
470}
471
472impl SourceStruct {
473    pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
474        Self { format, encode }
475    }
476}
477
478// Only return valid (format, encode)
479pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
480    use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
481
482    // old version meta.
483    if let Ok(format) = info.get_row_format() {
484        let (format, encode) = match format {
485            RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
486            RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
487            RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
488            RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
489            RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
490            RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
491            RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
492            RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
493            RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
494            RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
495            RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
496            RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
497            RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
498            RowFormatType::RowUnspecified => unreachable!(),
499        };
500        return Ok(SourceStruct::new(format, encode));
501    }
502    let source_format = info.get_format()?;
503    let source_encode = info.get_row_encode()?;
504    let (format, encode) = match (source_format, source_encode) {
505        (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
506        (PbFormatType::Plain, PbEncodeType::Protobuf) => {
507            (SourceFormat::Plain, SourceEncode::Protobuf)
508        }
509        (PbFormatType::Debezium, PbEncodeType::Json) => {
510            (SourceFormat::Debezium, SourceEncode::Json)
511        }
512        (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
513        (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
514        (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
515        (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
516        (PbFormatType::Plain, PbEncodeType::Parquet) => {
517            (SourceFormat::Plain, SourceEncode::Parquet)
518        }
519        (PbFormatType::Native, PbEncodeType::Native) => {
520            (SourceFormat::Native, SourceEncode::Native)
521        }
522        (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
523        (PbFormatType::Debezium, PbEncodeType::Avro) => {
524            (SourceFormat::Debezium, SourceEncode::Avro)
525        }
526        (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
527        (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
528        (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
529            (SourceFormat::DebeziumMongo, SourceEncode::Json)
530        }
531        (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
532        (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
533            (SourceFormat::Upsert, SourceEncode::Protobuf)
534        }
535        (format, encode) => {
536            bail!(
537                "Unsupported combination of format {:?} and encode {:?}",
538                format,
539                encode
540            );
541        }
542    };
543    Ok(SourceStruct::new(format, encode))
544}
545
546/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch.
547pub type BoxSourceMessageStream =
548    BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
549/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
550pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
551/// `StreamChunk` with the latest split state.
552/// The state is constructed in `StreamReaderBuilder::into_retry_stream`
553pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
554/// See [`StreamChunkWithState`].
555pub type BoxSourceChunkWithStateStream =
556    BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
557
558/// Stream of [`Option<StreamChunk>`]s parsed from the messages from the external source.
559pub type BoxStreamingFileSourceChunkStream =
560    BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
561
562// Manually expand the trait alias to improve IDE experience.
563pub trait SourceChunkStream:
564    Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
565{
566}
567impl<T> SourceChunkStream for T where
568    T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
569{
570}
571
572pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
573
574/// [`SplitReader`] is a new abstraction of the external connector read interface which is
575/// responsible for parsing, it is used to read messages from the outside and transform them into a
576/// stream of parsed [`StreamChunk`]
577#[async_trait]
578pub trait SplitReader: Sized + Send {
579    type Properties;
580    type Split: SplitMetaData;
581
582    async fn new(
583        properties: Self::Properties,
584        state: Vec<Self::Split>,
585        parser_config: ParserConfig,
586        source_ctx: SourceContextRef,
587        columns: Option<Vec<Column>>,
588    ) -> crate::error::ConnectorResult<Self>;
589
590    fn into_stream(self) -> BoxSourceChunkStream;
591
592    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
593        HashMap::new()
594    }
595
596    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
597        Err(anyhow!("seek_to_latest is not supported for this connector").into())
598    }
599}
600
601/// Information used to determine whether we should start and finish source backfill.
602///
603/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
604/// perhaps we should ban blocking DDL for it.
605#[derive(Debug, Clone)]
606pub enum BackfillInfo {
607    HasDataToBackfill {
608        /// The last available offsets for each split (**inclusive**).
609        ///
610        /// This will be used to determine whether source backfill is finished when
611        /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
612        /// blocking DDL cannot finish until new messages come.
613        ///
614        /// When there are upstream messages, we will use the latest offsets from the upstream.
615        latest_offset: String,
616    },
617    /// If there are no messages in the split at all, we don't need to start backfill.
618    /// In this case, there will be no message from the backfill stream too.
619    /// If we started backfill, we cannot finish it until new messages come.
620    /// So we mark this a special case for optimization.
621    NoDataToBackfill,
622}
623
624for_all_sources!(impl_connector_properties);
625
626impl Default for ConnectorProperties {
627    fn default() -> Self {
628        ConnectorProperties::Test(Box::default())
629    }
630}
631
632impl ConnectorProperties {
633    /// Creates typed source properties from the raw `WITH` properties.
634    ///
635    /// It checks the `connector` field, and them dispatches to the corresponding type's `try_from_btreemap` method.
636    ///
637    /// `deny_unknown_fields`: Since `WITH` options are persisted in meta, we do not deny unknown fields when restoring from
638    /// existing data to avoid breaking backwards compatibility. We only deny unknown fields when creating new sources.
639    pub fn extract(
640        with_properties: WithOptionsSecResolved,
641        deny_unknown_fields: bool,
642    ) -> Result<Self> {
643        let (options, secret_refs) = with_properties.into_parts();
644        let mut options_with_secret =
645            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
646        let connector = options_with_secret
647            .remove(UPSTREAM_SOURCE_KEY)
648            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
649            .to_lowercase();
650        match_source_name_str!(
651            connector.as_str(),
652            PropType,
653            PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
654                .map(ConnectorProperties::from),
655            |other| bail!("connector '{}' is not supported", other)
656        )
657    }
658
659    pub fn enforce_secret_source(
660        with_properties: &impl WithPropertiesExt,
661    ) -> crate::error::ConnectorResult<()> {
662        let connector = with_properties
663            .get_connector()
664            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
665            .to_lowercase();
666        let key_iter = with_properties.key_iter();
667        match_source_name_str!(
668            connector.as_str(),
669            PropType,
670            PropType::enforce_secret(key_iter),
671            |other| bail!("connector '{}' is not supported", other)
672        )
673    }
674
675    pub fn enable_drop_split(&self) -> bool {
676        // enable split scale in just for Kinesis
677        matches!(
678            self,
679            ConnectorProperties::Kinesis(_) | ConnectorProperties::Nats(_)
680        )
681    }
682
683    /// For most connectors, this should be false. When enabled, RisingWave should not track any progress.
684    pub fn enable_adaptive_splits(&self) -> bool {
685        matches!(self, ConnectorProperties::Nats(_))
686    }
687
688    /// Load additional info from `PbSource`. Currently only used by CDC.
689    pub fn init_from_pb_source(&mut self, source: &PbSource) {
690        dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
691    }
692
693    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
694    pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
695        dispatch_source_prop!(self, |prop| prop
696            .init_from_pb_cdc_table_desc(cdc_table_desc))
697    }
698
699    pub fn support_multiple_splits(&self) -> bool {
700        matches!(self, ConnectorProperties::Kafka(_))
701            || matches!(self, ConnectorProperties::OpendalS3(_))
702            || matches!(self, ConnectorProperties::Gcs(_))
703            || matches!(self, ConnectorProperties::Azblob(_))
704    }
705
706    pub async fn create_split_enumerator(
707        self,
708        context: crate::source::base::SourceEnumeratorContextRef,
709    ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
710        let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
711            <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
712        ));
713        Ok(enumerator)
714    }
715
716    pub async fn create_split_reader(
717        self,
718        splits: Vec<SplitImpl>,
719        parser_config: ParserConfig,
720        source_ctx: SourceContextRef,
721        columns: Option<Vec<Column>>,
722        mut opt: crate::source::CreateSplitReaderOpt,
723    ) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
724        opt.support_multiple_splits = self.support_multiple_splits();
725        tracing::debug!(
726            ?splits,
727            support_multiple_splits = opt.support_multiple_splits,
728            "spawning connector split reader",
729        );
730
731        dispatch_source_prop!(self, |prop| create_split_readers(
732            *prop,
733            splits,
734            parser_config,
735            source_ctx,
736            columns,
737            opt
738        )
739        .await)
740    }
741}
742
743for_all_sources!(impl_split);
744for_all_connections!(impl_connection);
745
746impl From<&SplitImpl> for ConnectorSplit {
747    fn from(split: &SplitImpl) -> Self {
748        dispatch_split_impl!(split, |inner| {
749            ConnectorSplit {
750                split_type: String::from(PropType::SOURCE_NAME),
751                encoded_split: inner.encode_to_bytes().to_vec(),
752            }
753        })
754    }
755}
756
757impl TryFrom<&ConnectorSplit> for SplitImpl {
758    type Error = crate::error::ConnectorError;
759
760    fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
761        let split_type = split.split_type.to_lowercase();
762        match_source_name_str!(
763            split_type.as_str(),
764            PropType,
765            {
766                <PropType as SourceProperties>::Split::restore_from_bytes(
767                    split.encoded_split.as_ref(),
768                )
769                .map(Into::into)
770            },
771            |other| bail!("connector '{}' is not supported", other)
772        )
773    }
774}
775
776impl SplitImpl {
777    fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
778        let split_type = split_type.to_lowercase();
779        match_source_name_str!(
780            split_type.as_str(),
781            PropType,
782            <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
783            |other| bail!("connector '{}' is not supported", other)
784        )
785    }
786
787    pub fn is_cdc_split(&self) -> bool {
788        matches!(
789            self,
790            MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
791        )
792    }
793
794    /// Get the current split offset.
795    pub fn get_cdc_split_offset(&self) -> String {
796        match self {
797            MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
798            PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
799            MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
800            CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
801            SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
802            _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
803        }
804    }
805
806    pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
807        match self {
808            SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
809                Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
810            }
811            _ => None,
812        }
813    }
814}
815
816impl SplitMetaData for SplitImpl {
817    fn id(&self) -> SplitId {
818        dispatch_split_impl!(self, |inner| inner.id())
819    }
820
821    fn encode_to_json(&self) -> JsonbVal {
822        use serde_json::json;
823        let inner = self.encode_to_json_inner().take();
824        json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
825    }
826
827    fn restore_from_json(value: JsonbVal) -> Result<Self> {
828        let mut value = value.take();
829        let json_obj = value.as_object_mut().unwrap();
830        let split_type = json_obj
831            .remove(SPLIT_TYPE_FIELD)
832            .unwrap()
833            .as_str()
834            .unwrap()
835            .to_owned();
836        let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
837        Self::restore_from_json_inner(&split_type, inner_value.into())
838    }
839
840    fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
841        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
842    }
843}
844
845impl SplitImpl {
846    pub fn get_type(&self) -> String {
847        dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
848    }
849
850    pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
851        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
852        Ok(())
853    }
854
855    pub fn encode_to_json_inner(&self) -> JsonbVal {
856        dispatch_split_impl!(self, |inner| inner.encode_to_json())
857    }
858}
859
860use risingwave_common::types::DataType;
861
862#[derive(Clone, Debug)]
863pub struct Column {
864    pub name: String,
865    pub data_type: DataType,
866    /// This field is only used by datagen.
867    pub is_visible: bool,
868}
869
870/// Split id resides in every source message, use `Arc` to avoid copying.
871pub type SplitId = Arc<str>;
872
873/// The message pumped from the external source service.
874/// The third-party message structs will eventually be transformed into this struct.
875#[derive(Debug, Clone)]
876pub struct SourceMessage {
877    pub key: Option<Vec<u8>>,
878    pub payload: Option<Vec<u8>>,
879    pub offset: String, // TODO: use `Arc<str>`
880    pub split_id: SplitId,
881    pub meta: SourceMeta,
882}
883
884impl SourceMessage {
885    /// Create a dummy `SourceMessage` with all fields unset for testing purposes.
886    pub fn dummy() -> Self {
887        Self {
888            key: None,
889            payload: None,
890            offset: "".to_owned(),
891            split_id: "".into(),
892            meta: SourceMeta::Empty,
893        }
894    }
895
896    /// Check whether the source message is a CDC heartbeat message.
897    pub fn is_cdc_heartbeat(&self) -> bool {
898        self.key.is_none() && self.payload.is_none()
899    }
900}
901
902#[derive(Debug, Clone)]
903pub enum SourceMeta {
904    Kafka(KafkaMeta),
905    Kinesis(KinesisMeta),
906    Pulsar(PulsarMeta),
907    Nexmark(NexmarkMeta),
908    GooglePubsub(GooglePubsubMeta),
909    Datagen(DatagenMeta),
910    DebeziumCdc(DebeziumCdcMeta),
911    Nats(NatsMeta),
912    // For the source that doesn't have meta data.
913    Empty,
914}
915
916/// Implement Eq manually to ignore the `meta` field.
917impl PartialEq for SourceMessage {
918    fn eq(&self, other: &Self) -> bool {
919        self.offset == other.offset
920            && self.split_id == other.split_id
921            && self.payload == other.payload
922    }
923}
924impl Eq for SourceMessage {}
925
926/// The metadata of a split.
927pub trait SplitMetaData: Sized {
928    fn id(&self) -> SplitId;
929    fn encode_to_bytes(&self) -> Bytes {
930        self.encode_to_json()
931            .as_scalar_ref()
932            .value_serialize()
933            .into()
934    }
935    fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
936        Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
937    }
938
939    /// Encode the whole split metadata to a JSON object
940    fn encode_to_json(&self) -> JsonbVal;
941    fn restore_from_json(value: JsonbVal) -> Result<Self>;
942    fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
943}
944
945/// [`ConnectorState`] maintains the consuming splits' info. In specific split readers,
946/// `ConnectorState` cannot be [`None`] and contains one(for mq split readers) or many(for fs
947/// split readers) [`SplitImpl`]. If no split is assigned to source executor, `ConnectorState` is
948/// [`None`] and the created source stream will be a pending stream.
949pub type ConnectorState = Option<Vec<SplitImpl>>;
950
951#[cfg(test)]
952mod tests {
953    use maplit::*;
954    use nexmark::event::EventType;
955
956    use super::*;
957    use crate::source::cdc::{DebeziumCdcSplit, Mysql};
958    use crate::source::kafka::KafkaSplit;
959
960    #[test]
961    fn test_split_impl_get_fn() -> Result<()> {
962        let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
963        let split_impl = SplitImpl::Kafka(split.clone());
964        let get_value = split_impl.into_kafka().unwrap();
965        println!("{:?}", get_value);
966        assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
967        assert_eq!(split.encode_to_json(), get_value.encode_to_json());
968
969        Ok(())
970    }
971
972    #[test]
973    fn test_cdc_split_state() -> Result<()> {
974        let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
975        let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
976        let split_impl = SplitImpl::MysqlCdc(split);
977        let encoded_split = split_impl.encode_to_bytes();
978        let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
979        assert_eq!(
980            split_impl.encode_to_bytes(),
981            restored_split_impl.encode_to_bytes()
982        );
983        assert_eq!(
984            split_impl.encode_to_json(),
985            restored_split_impl.encode_to_json()
986        );
987
988        let encoded_split = split_impl.encode_to_json();
989        let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
990        assert_eq!(
991            split_impl.encode_to_bytes(),
992            restored_split_impl.encode_to_bytes()
993        );
994        assert_eq!(
995            split_impl.encode_to_json(),
996            restored_split_impl.encode_to_json()
997        );
998        Ok(())
999    }
1000
1001    #[test]
1002    fn test_extract_nexmark_config() {
1003        let props = convert_args!(btreemap!(
1004            "connector" => "nexmark",
1005            "nexmark.table.type" => "Person",
1006            "nexmark.split.num" => "1",
1007        ));
1008
1009        let props =
1010            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1011                .unwrap();
1012
1013        if let ConnectorProperties::Nexmark(props) = props {
1014            assert_eq!(props.table_type, Some(EventType::Person));
1015            assert_eq!(props.split_num, 1);
1016        } else {
1017            panic!("extract nexmark config failed");
1018        }
1019    }
1020
1021    #[test]
1022    fn test_extract_kafka_config() {
1023        let props = convert_args!(btreemap!(
1024            "connector" => "kafka",
1025            "properties.bootstrap.server" => "b1,b2",
1026            "topic" => "test",
1027            "scan.startup.mode" => "earliest",
1028            "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
1029        ));
1030
1031        let props =
1032            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1033                .unwrap();
1034        if let ConnectorProperties::Kafka(k) = props {
1035            let btreemap = btreemap! {
1036                "b-1:9092".to_owned() => "dns-1".to_owned(),
1037                "b-2:9092".to_owned() => "dns-2".to_owned(),
1038            };
1039            assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
1040        } else {
1041            panic!("extract kafka config failed");
1042        }
1043    }
1044
1045    #[test]
1046    fn test_extract_cdc_properties() {
1047        let user_props_mysql = convert_args!(btreemap!(
1048            "connector" => "mysql-cdc",
1049            "database.hostname" => "127.0.0.1",
1050            "database.port" => "3306",
1051            "database.user" => "root",
1052            "database.password" => "123456",
1053            "database.name" => "mydb",
1054            "table.name" => "products",
1055        ));
1056
1057        let user_props_postgres = convert_args!(btreemap!(
1058            "connector" => "postgres-cdc",
1059            "database.hostname" => "127.0.0.1",
1060            "database.port" => "5432",
1061            "database.user" => "root",
1062            "database.password" => "654321",
1063            "schema.name" => "public",
1064            "database.name" => "mypgdb",
1065            "table.name" => "orders",
1066        ));
1067
1068        let conn_props = ConnectorProperties::extract(
1069            WithOptionsSecResolved::without_secrets(user_props_mysql),
1070            true,
1071        )
1072        .unwrap();
1073        if let ConnectorProperties::MysqlCdc(c) = conn_props {
1074            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1075            assert_eq!(c.properties.get("database.port").unwrap(), "3306");
1076            assert_eq!(c.properties.get("database.user").unwrap(), "root");
1077            assert_eq!(c.properties.get("database.password").unwrap(), "123456");
1078            assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
1079            assert_eq!(c.properties.get("table.name").unwrap(), "products");
1080        } else {
1081            panic!("extract cdc config failed");
1082        }
1083
1084        let conn_props = ConnectorProperties::extract(
1085            WithOptionsSecResolved::without_secrets(user_props_postgres),
1086            true,
1087        )
1088        .unwrap();
1089        if let ConnectorProperties::PostgresCdc(c) = conn_props {
1090            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1091            assert_eq!(c.properties.get("database.port").unwrap(), "5432");
1092            assert_eq!(c.properties.get("database.user").unwrap(), "root");
1093            assert_eq!(c.properties.get("database.password").unwrap(), "654321");
1094            assert_eq!(c.properties.get("schema.name").unwrap(), "public");
1095            assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1096            assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1097        } else {
1098            panic!("extract cdc config failed");
1099        }
1100    }
1101}